diff options
authorPaul J. Davis <>2020-09-03 13:28:19 -0500
committerPaul J. Davis <>2020-09-15 12:49:02 -0500
commit15ff5802612bfd39f0cc6f4d7f0c3fd993552f7d (patch)
parent56e0f9c936af75d4206346214cd836206f4b629a (diff)
Remove couch_rate
This implementation was difficult to understand and had behavior that was too difficult to predict. It would break if view behavior changed in significant ways from what was originally expected.
27 files changed, 35 insertions, 1848 deletions
diff --git a/.credo.exs b/.credo.exs
index 112561b95..bd26f407c 100644
--- a/.credo.exs
+++ b/.credo.exs
@@ -37,7 +37,6 @@
- ~r"/src/stream_data",
diff --git a/.gitignore b/.gitignore
index 5c4255245..c84d39e5d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -78,7 +78,6 @@ src/rebar/
diff --git a/Makefile b/Makefile
index 2e3cc8acb..e8d366296 100644
--- a/Makefile
+++ b/Makefile
@@ -165,7 +165,7 @@ check: all
@$(MAKE) emilio
make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric,mango,chttpd
make elixir tests=test/elixir/test/basics_test.exs,test/elixir/test/replication_test.exs,test/elixir/test/map_test.exs,test/elixir/test/all_docs_test.exs,test/elixir/test/bulk_docs_test.exs
- make exunit apps=couch_rate,chttpd
+ make exunit apps=chttpd
make mango-test
.PHONY: eunit
diff --git a/mix.exs b/mix.exs
index 480d426b1..29c81fa49 100644
--- a/mix.exs
+++ b/mix.exs
@@ -49,14 +49,11 @@ defmodule CouchDBTest.Mixfile do
# Run "mix help" to learn about applications.
def application do
- extra_applications: extra_applications(Mix.env()),
+ extra_applications: [:logger],
applications: [:httpotion]
- defp extra_applications(:test), do: [:logger, :stream_data]
- defp extra_applications(_), do: [:logger]
# Specifies which paths to compile per environment.
defp elixirc_paths(:test), do: ["test/elixir/lib", "test/elixir/test/support"]
defp elixirc_paths(:integration), do: ["test/elixir/lib", "test/elixir/test/support"]
@@ -71,8 +68,7 @@ defmodule CouchDBTest.Mixfile do
{:jiffy, path: Path.expand("src/jiffy", __DIR__)},
path: Path.expand("src/ibrowse", __DIR__), override: true, compile: false},
- {:credo, "~> 1.2.0", only: [:dev, :test, :integration], runtime: false},
- {:stream_data, "~> 0.4.3", only: [:dev, :test, :integration], runtime: false}
+ {:credo, "~> 1.2.0", only: [:dev, :test, :integration], runtime: false}
diff --git a/mix.lock b/mix.lock
index 7a155c6bb..c03e11f64 100644
--- a/mix.lock
+++ b/mix.lock
@@ -14,6 +14,5 @@
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
- "stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm", "7dafd5a801f0bc897f74fcd414651632b77ca367a7ae4568778191fc3bf3a19a"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
diff --git a/rebar.config.script b/rebar.config.script
index 963d97fb1..f3a975032 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -123,7 +123,6 @@ SubDirs = [
- "src/couch_rate",
diff --git a/rel/files/eunit.ini b/rel/files/eunit.ini
index 20277f288..2b73ab307 100644
--- a/rel/files/eunit.ini
+++ b/rel/files/eunit.ini
@@ -40,6 +40,3 @@ startup_jitter = 0
; disable index auto-updater to avoid interfering with some of the tests
index_updater_enabled = false
-opts = #{budget => 100, target => 500, window => 6000, sensitivity => 200, congested_delay => 1} \ No newline at end of file
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 071359a2f..8f2b25e22 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -727,11 +727,6 @@ compaction = false
; all = (#{}) -> true
-limiter = couch_rate_limiter
-opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000}
; Some low-level FDB transaction options. These options will be applied to the
; database handle and inherited by each transaction started with that handle.
; The description of these can be found in fdb_c_option.g.h include file from
diff --git a/src/couch_rate/ b/src/couch_rate/
deleted file mode 100644
index 530da1a99..000000000
--- a/src/couch_rate/
+++ /dev/null
@@ -1,155 +0,0 @@
-# Description
-The `couch_rate` application implements a generic rate limiter which can
-be used to control batch size and delay between batches. It was initially
-designed for background index build to find an optimal batch size to utilize
-the FDB transaction up to configured `target` parameter. The application
-provides an API to plug custom rate limiting logic when need to.
-# Default rate limit logic
-The `couch_rate_limiter` is the rate limit module used by default.
-The module tracks average number of reads and writes over specified
-time period. It uses average read/write numbers to calculate an
-approximate value for read/write ratio. Then the read/write ratio is
-used to convert estimated amount of writes into batch size.
-# Configuration
-## API based usage
-In the simplest use case the only mandatory keys `new/3` expects are:
-* `budget` - the initial value for estimated batch size
-* `target` - the amount in msec which we try to maintain for batch processing time
-* `window` - time interval for contention detector
-* `sensitivity` - minimal interval within the `window`
-We choose sane default values for the rest of the parameters.
-* `window_size = window div sensitivity + 1`
-* `underload_threshold = round(target * 0.95)`
-* `overload_threshold = round(target * 1.05)`
-* `delay_threshold = round(target * 1.07)`
-Due to the use of `round` in defaults calculation the `target` cannot be less
-than `36` msec. Otherwise some of the thresholds become equal which breaks the
-In the case when you need to specify custom parameters, the following keys
-are supported:
-* `window_size` - how many batches to consider in contention detector
-* `timer` - this is used for testing to fast forward time `fun() -> current_time_in_ms() end`
-* `target` - the amount in msec which we try to maintain for batch processing time
-* `underload_threshold` - a threshold bellow which we would try to increase the budget
-* `overload_threshold` - a threshold above which we would start decreasing the budget
-* `delay_threshold` - a threshold above which we would start introducing delays between batches
-* `multiplicative_factor` - determines how fast we are going to decrease budget (must be in (0..1) range)
-* `regular_delay` - delay between batches when there is no overload
-* `congested_delay` - delay between batches when there is an overload
-* `initial_budget` - initial value for budget to start with
-## default.ini based usage
-The users of the `couch_rate` application pass the `ConfigId` parameter.
-When calling `couch_rate:new` and `couch_rate:create_if_missing`.
-The `couch_rate` application uses this information to construct name of the
-configuration section to use to get configuration parameters. The configration
-section is constructed using `"couch_rate." ++ ConfigId`.
-The parameters are encoded using erlang map syntax.
-Limitation of the map parser:
-* Keys must be atoms
-* Values are either integers or floats
-* We only support positive values in the map
-* Configuration object cannot use erlang reserved words in keys:
- `after`, `and`, `andalso`, `band`, `begin`, `bnot`, `bor`,
- `bsl`, `bsr`, `bxor`, `case`, `catch`, `cond`, `div`, `end`
- `fun`, `if`, `let`, `not`, `of`, `or`, `orelse`, `receive`
- `rem`, `try`, `when`, `xor`
-The auxilary `couch_rate_config` module implements the following API:
-* `couch_rate_config:from_str/1` - parses a string representation of parameters
-* `couch_rate_config:to_str/1` - converts parameters to string (used in testing)
-Here is the example of configuration used in `couch_view` application:
-limiter = couch_rate_limiter
-opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000}
-In the `couch_view` application it is used as follows:
-Limiter = couch_rate:create_if_missing({DbName, DDocId}, "views"),
-# API
-The application implements two APIs. Both APIs are supported by `couch_rate`
-module. The API variants are:
-* explicit state passing
-* state store based approach
-The API is chosen baed on the `StoreModule` argument passed to `new/4`.
-Currently we support following values for `StoreModule`:
-* `nil` - this value indicates that explicit state passing would be used
-* `couch_rate_ets` - ets based global state store (ets tables are owned by app supervisor)
-* `couch_rate_pd` - process dicionary based local state store
-The "explicit state passing" style returns a tuple `{Result :: term(), state()}`.
-The result is the same as for state store based API.
-## State store based APIs of `couch_rate` module.
-All functions can return `{error, Reason :: term()}` in case of errors.
-This detail is ommited bellow.
-* `create_if_missing(Id :: id(), Module :: module(), Store :: module(), Options :: map()) -> limiter()` - create new rate limiter instance
-* `new(Id :: id(), Module :: module(), Store :: module(), Options :: map()) -> limiter()` - create new rate limiter instance
-* `budget(limiter()) -> Budget :: integer().` - get batch size
-* `delay(limiter()) -> Delay :: timeout().` - return delay in msec between batches
-* `wait(limiter()) -> ok` - block the caller for amount of time returned by `delay/1`
-* `in(limiter(), Reads :: integer()) -> limiter()` - notify rate limiter on the amount of reads were actually done (could be less than `budget`)
-* `success(limiter(), Writes :: integer()) -> limiter()` - how many writes happen
-* `failure(limiter()) -> limiter()` - called instead of `success/2` when failure happen
-* `is_congestion(limiter()) -> boolean()` - returns `false` when congestion is detected
-* `format(limiter()) -> [{Key :: atom(), Value :: term()}]` - return key value list representing important aspects of the limiter state
-* `id(limitter()) -> id()` - returns `id()` of the rate limiter
-* `module(limiter()) -> module()` - returns callback module implementing rate limiting logic.
-* `state(limiter()) -> state()` - returns internal state of rate limiter.
-* `store(limiter()) -> module() | nil` - returns store state backend.
-# Testing
-The test suite is written in Elixir.
-## Running all tests
-make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/
-## Running specific test suite
-make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs
-## Running specific test using line number
-make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs:10
-## Running traces with stats output
-make couch && ERL_LIBS=`pwd`/src EXUNIT_DEBUG=true mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs
-``` \ No newline at end of file
diff --git a/src/couch_rate/src/ b/src/couch_rate/src/
deleted file mode 100644
index ed6de81d6..000000000
--- a/src/couch_rate/src/
+++ /dev/null
@@ -1,24 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- {application, couch_rate, [
- {description, "Simple rate limiter"},
- {vsn, git},
- {registered, [
- ]},
- {applications, [
- kernel,
- stdlib,
- syntax_tools
- ]},
- {mod, {couch_rate_app, []}}
diff --git a/src/couch_rate/src/couch_rate.erl b/src/couch_rate/src/couch_rate.erl
deleted file mode 100644
index 24bbcc2a5..000000000
--- a/src/couch_rate/src/couch_rate.erl
+++ /dev/null
@@ -1,318 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- create_if_missing/2,
- create_if_missing/3,
- create_if_missing/4,
- new/2,
- new/3,
- new/4,
- from_map/4,
- budget/1,
- delay/1,
- wait/1,
- in/2,
- success/2,
- failure/1,
- is_congestion/1,
- min_latency/1,
- format/1,
- to_map/1,
- id/1,
- module/1,
- state/1,
- store/1
--define(LIMITER, ?MODULE).
--type id() :: term().
--type state() :: term().
--type store() :: module().
--opaque limiter() :: #?LIMITER{}.
- id/0,
- state/0,
- limiter/0
--spec create_if_missing(id(), string()) ->
- couch_rate:limiter() | {error, Reason :: term()}.
-create_if_missing(Id, ConfigId) ->
- ?MODULE:create_if_missing(Id, ConfigId, couch_rate_ets).
--spec create_if_missing(id(), string(), nil | module()) ->
- couch_rate:limiter() | {error, Reason :: term()}.
-create_if_missing(Id, ConfigId, StateStore) ->
- {Module, Options} = get_config(ConfigId),
- ?MODULE:create_if_missing(Id, Module, StateStore, Options).
--spec create_if_missing(id(), module(), nil | module(), map()) ->
- couch_rate:limiter() | {error, Reason :: term()}.
-create_if_missing(Id, Module, nil, Options) ->
- id = Id,
- module = Module,
- store = nil,
- state = Module:new(Id, Options)
- };
-create_if_missing(Id, Module, Store, Options) ->
- case Store:create_if_missing(Id, Module:new(Id, Options)) of
- {error, _} = Error ->
- Error;
- State ->
- id = Id,
- module = Module,
- store = Store,
- state = State
- }
- end.
--spec new(id(), string()) ->
- couch_rate:limiter() | {error, Reason :: term()}.
-new(Id, ConfigId) ->
- ?MODULE:new(Id, ConfigId, couch_rate_ets).
--spec new(id(), string(), module()) ->
- couch_rate:limiter() | {error, Reason :: term()}.
-new(Id, ConfigId, StateStore) ->
- {Module, Options} = get_config(ConfigId),
- ?MODULE:new(Id, Module, StateStore, Options).
--spec new(id(), module(), nil | module(), map()) ->
- couch_rate:limiter() | {error, Reason :: term()}.
-new(Id, Module, nil, Options) ->
- id = Id,
- module = Module,
- store = nil,
- state = Module:new(Id, Options)
- };
-new(Id, Module, Store, Options) ->
- case Store:new(Id, Module:new(Id, Options)) of
- {error, _} = Error ->
- Error;
- State ->
- id = Id,
- module = Module,
- store = Store,
- state = State
- }
- end.
--spec from_map(id(), module(), store(), map()) ->
- couch_rate:limiter()
- | {error, Reason :: term()}.
-from_map(Id, Module, nil, Map) ->
- id = Id,
- module = Module,
- store = nil,
- state = Module:from_map(Map)
- };
-from_map(Id, Module, Store, Map) ->
- case Store:new(Id, Module:from_map(Map)) of
- {error, _} = Error ->
- Error;
- State ->
- id = Id,
- module = Module,
- store = Store,
- state = State
- }
- end.
--spec update(limiter(), (
- fun(
- (id(), state()) ->
- {Result :: term(), state()}
- | {error, Reason :: term()}
- )
- )) ->
- Result :: term()
- | {Result :: term(), state()}
- | {error, Reason :: term()}.
-update(#?LIMITER{store = nil, id = Id, state = State0} = Limiter, Fun) ->
- case Fun(Id, State0) of
- {error, _Reason} = Error ->
- Error;
- {Result, State1} ->
- {Result, Limiter#?LIMITER{state = State1}}
- end;
-update(#?LIMITER{id = Id, store = Store, state = State}, Fun) ->
- Store:update(Id, State, Fun).
--spec budget(limiter()) ->
- Budget :: integer()
- | {Budget :: integer(), limiter()}
- | {error, term()}.
-budget(#?LIMITER{module = Module} = Limiter) ->
- update(Limiter, fun(Id, StateIn) ->
- Module:budget(Id, StateIn)
- end).
--spec delay(limiter()) ->
- DelayTime :: integer()
- | {DelayTime :: integer(), limiter()}
- | {error, term()}.
-delay(#?LIMITER{module = Module} = Limiter) ->
- update(Limiter, fun(Id, State) ->
- Module:delay(Id, State)
- end).
--spec wait(limiter()) ->
- ok
- | {ok, limiter()}
- | {error, term()}.
-wait(#?LIMITER{module = Module} = Limiter) ->
- update(Limiter, fun(Id, State) ->
- Module:wait(Id, State)
- end).
--spec in(limiter(), integer()) ->
- ok
- | {ok, limiter()}
- | {error, term()}.
-in(#?LIMITER{module = Module} = Limiter, Reads) ->
- update(Limiter, fun(Id, State) ->
- Module:in(Id, State, Reads)
- end).
--spec success(limiter(), integer()) ->
- ok
- | limiter()
- | {error, term()}.
-success(#?LIMITER{module = Module} = Limiter, Writes) ->
- update(Limiter, fun(Id, State) ->
- Module:success(Id, State, Writes)
- end).
--spec failure(limiter()) ->
- ok
- | limiter()
- | {error, term()}.
-failure(#?LIMITER{module = Module} = Limiter) ->
- update(Limiter, fun(Id, State) ->
- Module:failure(Id, State)
- end).
--spec is_congestion(limiter()) -> boolean().
-is_congestion(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
- Module:is_congestion(Id, State);
-is_congestion(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
- Module:is_congestion(Id, Store:lookup(Id, State)).
--spec format(limiter()) -> [{Key :: atom(), Value :: term()}].
-format(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
- Module:format(Id, State);
-format(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
- Module:format(Id, Store:lookup(Id, State)).
--spec to_map(limiter()) -> map().
-to_map(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
- Module:to_map(Id, State);
-to_map(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
- Module:to_map(Id, Store:lookup(Id, State)).
--spec min_latency(limiter()) -> pos_integer().
-min_latency(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
- Module:min_latency(Id, State);
-min_latency(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
- Module:to_map(Id, Store:lookup(Id, State)).
--spec id(limiter()) -> module().
-id(Limiter) ->
- Limiter#?
--spec module(limiter()) -> module().
-module(Limiter) ->
- Limiter#?LIMITER.module.
--spec state(limiter()) -> state().
-state(Limiter) ->
- Limiter#?LIMITER.state.
--spec store(limiter()) -> module() | nil.
-store(Limiter) ->
- Limiter#?
-get_config(ConfigId) ->
- ConfigSection = "couch_rate." ++ ConfigId,
- ModuleStr = config:get(ConfigSection, "limiter", "couch_rate_limiter"),
- Module = list_to_existing_atom(ModuleStr),
- case config:get(ConfigSection, "opts", undefined) of
- undefined ->
- {error, #{missing_key => "opts", in => ConfigSection}};
- OptionsStr ->
- Options = couch_rate_config:from_str(OptionsStr),
- lists:map(fun(Key) ->
- maps:is_key(Key, Options) orelse error(#{missing_key => Key, in => Options})
- end, [budget, target, window, sensitivity]),
- {Module, Options}
- end.
diff --git a/src/couch_rate/src/couch_rate.hrl b/src/couch_rate/src/couch_rate.hrl
deleted file mode 100644
index d19f7d8e4..000000000
--- a/src/couch_rate/src/couch_rate.hrl
+++ /dev/null
@@ -1,19 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- {
- id :: couch_rate:id(),
- module = couch_rate_limiter :: module(),
- store = couch_rate_ets :: module() | nil,
- state :: couch_rate:state()
- }).
diff --git a/src/couch_rate/src/couch_rate_app.erl b/src/couch_rate/src/couch_rate_app.erl
deleted file mode 100644
index 2bb1621c3..000000000
--- a/src/couch_rate/src/couch_rate_app.erl
+++ /dev/null
@@ -1,28 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- start/2,
- stop/1
-start(_StartType, _StartArgs) ->
- couch_rate_sup:start_link().
-stop(_State) ->
- ok.
diff --git a/src/couch_rate/src/couch_rate_config.erl b/src/couch_rate/src/couch_rate_config.erl
deleted file mode 100644
index 709fbc3d3..000000000
--- a/src/couch_rate/src/couch_rate_config.erl
+++ /dev/null
@@ -1,66 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-% This parser supports only maps where key is atom and value
-% is positive float or positive integer.
- from_str/1,
- to_str/1
-from_str(String) ->
- parse_map(merl:quote(String)).
-to_str(Map) when is_map(Map) ->
- StringArgs = maps:fold(fun(Key, Val, Acc) ->
- Acc ++ [atom_to_list(Key) ++ " => " ++ number_to_list(Val)]
- end, [], Map),
- "#{" ++ string:join(StringArgs, ", ") ++ "}".
-number_to_list(Int) when is_integer(Int) ->
- integer_to_list(Int);
-number_to_list(Float) when is_float(Float) ->
- float_to_list(Float).
-parse_map(MapAST) ->
- erl_syntax:type(MapAST) == map_expr
- orelse fail("Only #{field => pos_integer() | float()} syntax is supported"),
- %% Parsing map manually, since merl does not support maps
- lists:foldl(fun(AST, Bindings) ->
- NameAST = erl_syntax:map_field_assoc_name(AST),
- erl_syntax:type(NameAST) == atom
- orelse fail("Only atoms are supported as field names"),
- Name = erl_syntax:atom_value(NameAST),
- ValueAST = erl_syntax:map_field_assoc_value(AST),
- Value = case erl_syntax:type(ValueAST) of
- integer ->
- erl_syntax:integer_value(ValueAST);
- float ->
- erl_syntax:float_value(ValueAST);
- _ ->
- fail("Only pos_integer() or float() alowed as values")
- end,
- Bindings#{Name => Value}
- end, #{}, erl_syntax:map_expr_fields(MapAST)).
-fail(Msg) ->
- throw({error, Msg}). \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate_ets.erl b/src/couch_rate/src/couch_rate_ets.erl
deleted file mode 100644
index edd9d965c..000000000
--- a/src/couch_rate/src/couch_rate_ets.erl
+++ /dev/null
@@ -1,119 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- create_tables/0,
- delete_tables/0,
- create_if_missing/2,
- new/2,
- lookup/2,
- update/3
--define(SHARDS_N, 16).
--type id() :: term().
--type state() :: term().
--type result() :: term().
--type store_state() :: term().
--spec create_if_missing(couch_rate:id(), state()) ->
- store_state().
-create_if_missing(Id, State) ->
- Tid = term_to_table(Id),
- case ets:lookup(Tid, Id) of
- [_ | _] -> ok;
- _ -> ets:insert(Tid, {Id, State})
- end,
- ok.
--spec new(couch_rate:id(), state()) ->
- store_state()
- | {error, term()}.
-new(Id, State) ->
- Tid = term_to_table(Id),
- case ets:insert_new(Tid, {Id, State}) of
- true -> ok;
- false -> {error, #{reason => already_exists, id => Id}}
- end.
--spec update(id(), store_state(), fun(
- (id(), state()) -> {state(), result()}
- )) ->
- result()
- | {error, term()}.
-update(Id, _StoreState, Fun) ->
- Tid = term_to_table(Id),
- case ets:lookup(Tid, Id) of
- [{Id, State0}] ->
- case Fun(Id, State0) of
- {Result, State1} ->
- ets:insert(Tid, {Id, State1}),
- Result;
- Error ->
- Error
- end;
- _ ->
- {error, #{reason => cannot_find, id => Id}}
- end.
--spec lookup(id(), store_state()) ->
- state()
- | {error, term()}.
-lookup(Id, _StoreState) ->
- Tid = term_to_table(Id),
- case ets:lookup(Tid, Id) of
- [{Id, State}] ->
- State;
- _ ->
- {error, #{reason => cannot_find, id => Id}}
- end.
-create_tables() ->
- Opts = [named_table, public, {read_concurrency, true}],
- [ets:new(TableName, Opts) || TableName <- table_names()],
- ok.
-delete_tables() ->
- [ets:delete(TableName) || TableName <- table_names()],
- ok.
--spec term_to_table(any()) -> atom().
-term_to_table(Term) ->
- PHash = erlang:phash2(Term),
- table_name(PHash rem ?SHARDS_N).
--dialyzer({no_return, table_names/0}).
--spec table_names() -> [atom()].
-table_names() ->
- [table_name(N) || N <- lists:seq(0, ?SHARDS_N - 1)].
--spec table_name(non_neg_integer()) -> atom().
-table_name(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS_N ->
- list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)). \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate_limiter.erl b/src/couch_rate/src/couch_rate_limiter.erl
deleted file mode 100644
index 97a630206..000000000
--- a/src/couch_rate/src/couch_rate_limiter.erl
+++ /dev/null
@@ -1,392 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-%% This module implements an algorithm to control the consumption rate
-%% parameters such as:
-%% - batch size
-%% - delay between batches
-%% The components of the algorithm use
-%% - [ascending minima algorithm](
-%% - "Welford's method" of calculating average
- new/2,
- from_map/2,
- budget/2,
- delay/2,
- wait/2,
- in/3,
- success/3,
- failure/2,
- is_congestion/2,
- min_latency/2,
- format/2,
- to_map/2
--type msec() :: non_neg_integer().
--define(STATE, ?MODULE).
-%% This is the number below which the math would not work due to round errors
-%% In particular the default values for thresholds would be equal
--define(MIN_TARGET, 36).
--define(record_to_keyval(Name, Record),
- lists:zip(record_info(fields, Name),
- tl(tuple_to_list(Record)))).
--define(map_to_record(RecordName, Map),
- element(1, lists:foldl(fun(Field, {Record, Idx}) ->
- {setelement(Idx, Record, maps:get(Field, Map, element(Idx, Record))), Idx + 1}
- end, {#RecordName{}, 2}, record_info(fields, RecordName)))).
--define(record_to_map(RecordName, Record),
- element(1, lists:foldl(fun(Field, {Map, Idx}) ->
- {
- maps:put(Field, element(Idx, Record), Map),
- Idx + 1
- }
- end, {#{}, 2}, record_info(fields, RecordName)))).
--record(?STATE, {
- window_size = 0 :: 0 | pos_integer(),
- timer = fun now_msec/0,
- size = 1 :: pos_integer(),
- epoch = 1 :: pos_integer(),
- minimums :: queue:queue() | undefined,
- start_ts = undefined,
- mean_reads = 0.0,
- mean_writes = 0.0,
- reads = 0,
- writes = 0,
- target = 4500,
- underload_threshold = 4275, %% target * 0.95
- overload_threshold = 4725, %% target * 1.05
- delay_threshold = 4950, %% target * 1.10
- multiplicative_factor = 0.7,
- regular_delay = 100 :: timeout(),
- congested_delay = 5000 :: timeout(),
- initial_budget = 100,
- latency = 0,
- has_failures = false
--type state() :: #?STATE{}.
--spec new(couch_rate:id(), Opts :: map()) -> state().
-new(_Id, #{sensitivity := S}) when S =< 0 ->
- error("expected SensitivityTimeWindow > 0");
-new(_Id, #{target := T}) when T < ?MIN_TARGET ->
- error("the target is too small");
-new(_Id, #{budget := B, target := T, window := W, sensitivity := S} = Opts) ->
- WinSize = W div S + 1,
- validate_arguments(?map_to_record(?STATE, maps:merge(#{
- minimums => queue:new(),
- window_size => WinSize,
- initial_budget => B,
- underload_threshold => round(T * 0.95),
- overload_threshold => round(T * 1.05),
- delay_threshold => round(T * 1.07)
- }, maps:without([budget, window, sensitivity], Opts)))).
--spec from_map(couch_rate:id(), map()) -> state().
-from_map(_Id, Map) ->
- ?map_to_record(?STATE, Map).
--spec budget(couch_rate:id(), state()) ->
- {pos_integer(), state()}.
-budget(Id, #?STATE{} = State) ->
- #?STATE{
- reads = R,
- writes = W,
- mean_writes = MW,
- mean_reads = MR,
- multiplicative_factor = MultiplicativeFactor,
- target = Target,
- initial_budget = InitialBudget,
- latency = Latency
- } = State,
- case pattern(Id, State) of
- optimal ->
- {max(1, round(MR)), State};
- failed ->
- %% decrease budget
- {max(1, round(R * MultiplicativeFactor)), State};
- overloaded ->
- %% decrease budget
- {max(1, round(R * MultiplicativeFactor)), State};
- underloaded when W == 0 orelse Latency == 0 ->
- {max(1, round(MR)), State};
- underloaded ->
- ReadWriteRatio = min(1, MR / max(1, MW)),
- SingleWrite = Latency / W,
- EstimatedWrites = floor(Target / SingleWrite),
- {max(1, round(ReadWriteRatio * EstimatedWrites)), State};
- init ->
- {InitialBudget, State}
- end.
--spec delay(couch_rate:id(), state()) ->
- {pos_integer(), state()}.
-delay(Id, #?STATE{} = State) ->
- #?STATE{
- regular_delay = RD,
- congested_delay = CD
- } = State,
- case pattern(Id, State) of
- failed ->
- {CD, State};
- _ ->
- {RD, State}
- end.
--spec wait(couch_rate:id(), state()) ->
- ok.
-wait(Id, State) ->
- {Delay, _} = delay(Id, State),
- timer:sleep(Delay).
--spec in(couch_rate:id(), state(), Reads :: pos_integer()) ->
- {ok, state()}.
-in(_Id, #?STATE{timer = TimerFun} = State, Reads) ->
- {ok, State#?STATE{
- reads = Reads,
- start_ts = TimerFun()
- }}.
--spec success(couch_rate:id(), state(), Writes :: pos_integer()) ->
- {ok, state()}.
-success(_Id, #?STATE{start_ts = undefined} = State, _Writes) ->
- {ok, State};
-success(_Id, #?STATE{} = State, Writes) ->
- #?STATE{
- start_ts = TS,
- timer = TimerFun,
- reads = Reads,
- mean_reads = MeanReads,
- mean_writes = MeanWrites,
- window_size = WinSize
- } = State,
- {ok, update_min(State#?STATE{
- writes = Writes,
- mean_writes = average(MeanWrites, WinSize, Writes),
- mean_reads = average(MeanReads, WinSize, Reads),
- latency = TimerFun() - TS,
- has_failures = false
- })}.
--spec failure(couch_rate:id(), state()) -> {ok, state()}.
-failure(_Id, #?STATE{start_ts = undefined} = State) ->
- {ok, State};
-failure(_Id, #?STATE{} = State) ->
- #?STATE{
- timer = TimerFun,
- start_ts = TS
- } = State,
- {ok, update_min(State#?STATE{
- writes = 0,
- latency = TimerFun() - TS,
- has_failures = true
- })}.
--spec is_congestion(couch_rate:id(), state()) -> boolean().
-is_congestion(Id, #?STATE{} = State) ->
- case pattern(Id, State) of
- overloaded -> true;
- failed -> true;
- _ -> false
- end.
--spec format(couch_rate:id(), state()) -> [{Key :: atom(), Value :: term()}].
-format(_Id, #?STATE{minimums = M} = State) ->
- Map = ?record_to_map(?STATE, State),
- Minimums = lists:map(fun({D, V}) ->
- [{value, V}, {death, D}]
- end, queue:to_list(M)),
- maps:to_list(maps:merge(Map, #{
- minimums => Minimums
- })).
--spec to_map(couch_rate:id(), state()) -> map().
-to_map(_Id, #?STATE{} = State) ->
- ?record_to_map(?STATE, State).
--spec update_min(state()) -> state().
-update_min(#?STATE{latency = ProcessingDelay} = Q0) ->
- Q1 = remove_greater_than(Q0, ProcessingDelay),
- Q2 = append(Q1, ProcessingDelay),
- maybe_remove_first(Q2).
--spec pattern(couch_rate:id(), state()) ->
- init
- | underloaded
- | overloaded
- | optimal
- | failed.
-pattern(Id, #?STATE{} = State) ->
- #?STATE{
- underload_threshold = UnderloadThreshold,
- overload_threshold = OverloadThreshold,
- mean_writes = MW,
- has_failures = HasFailures
- } = State,
- case min_latency(Id, State) of
- MinRollingLatency when MinRollingLatency > OverloadThreshold ->
- overloaded;
- MinRollingLatency when MinRollingLatency > UnderloadThreshold ->
- optimal;
- MinRollingLatency when MinRollingLatency == 0 andalso MW == 0.0 ->
- init;
- _ when HasFailures ->
- failed;
- _ ->
- underloaded
- end.
--spec min_latency(couch_rate:id(), state()) -> pos_integer() | 0.
-min_latency(_Id, #?STATE{size = 1}) ->
- 0;
-min_latency(_Id, #?STATE{minimums = Minimums}) ->
- {value, {_, Min}} = head(Minimums),
- Min.
-validate_arguments(#?STATE{timer = TimerFun})
- when not is_function(TimerFun, 0) ->
- error("expected `timer` to be an arity 0 function");
-validate_arguments(#?STATE{window_size = WinSize})
- when WinSize < 1 ->
- error("expected `window_size` to be greater than 1");
-validate_arguments(#?STATE{initial_budget = Budget})
- when Budget < 1 ->
- error("expected `initial_budget` to be greater than 1");
-validate_arguments(#?STATE{overload_threshold = OT, target = T})
- when OT =< T ->
- error("expected `overload_threshold` to be greater than `target`");
-validate_arguments(#?STATE{underload_threshold = UT, target = T})
- when UT >= T ->
- error("expected `underload_threshold` to be less than `target`");
-validate_arguments(#?STATE{delay_threshold = DT, overload_threshold = OT})
- when DT =< OT ->
- error("expected `delay_threshold` to be greater than `overload_threshold`");
-validate_arguments(#?STATE{multiplicative_factor = MF})
- when MF < 0 orelse MF > 1 ->
- error("expected `multiplicative_factor` to be in the (0, 1) range");
-validate_arguments(#?STATE{} = State) ->
- State.
--spec remove_greater_than(state(), pos_integer()) -> state().
-remove_greater_than(#?STATE{minimums = Minimums, size = S} = State, Value) ->
- case tail(Minimums) of
- {value, {_, T}} when Value =< T ->
- NewState = State#?STATE{minimums = tail_drop(Minimums), size = S - 1},
- remove_greater_than(NewState, Value);
- {value, _} ->
- State;
- empty ->
- State#?STATE{epoch = 1}
- end.
--spec append(state(), pos_integer()) -> state().
-append(#?STATE{minimums = Minimums, epoch = E, window_size = S} = State, Value) ->
- Death = E + S,
- State#?STATE{
- minimums = tail_put(Minimums, {Death, Value}),
- epoch = E + 1,
- size = S + 1
- }.
--spec maybe_remove_first(state()) -> state().
-maybe_remove_first(#?STATE{minimums = Minimums, epoch = E, size = S} = State) ->
- case head(Minimums) of
- {value, {E, _V}} ->
- State#?STATE{minimums = head_drop(Minimums), size = S - 1};
- _ ->
- State
- end.
-% Donald Knuth’s Art of Computer Programming, Vol 2, page 232, 3rd
-% Welford method
-average(Avg, WindowSize, Value) ->
- Delta = Value - Avg,
- Avg + Delta / WindowSize.
-%% The helper functions are added because queue module
-%% naming conventions are weird
-head(Q) -> queue:peek_r(Q).
-head_drop(Q) -> queue:drop_r(Q).
-tail(Q) -> queue:peek(Q).
-tail_put(Q, V) -> queue:in_r(V, Q).
-tail_drop(Q) -> queue:drop(Q).
--spec now_msec() -> msec().
-now_msec() ->
- {Mega, Sec, Micro} = os:timestamp(),
- ((Mega * 1000000) + Sec) * 1000 + Micro div 1000. \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate_pd.erl b/src/couch_rate/src/couch_rate_pd.erl
deleted file mode 100644
index 5d79f7890..000000000
--- a/src/couch_rate/src/couch_rate_pd.erl
+++ /dev/null
@@ -1,90 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- new/2,
- create_if_missing/2,
- lookup/2,
- update/3
--type id() :: term().
--type state() :: term().
--type result() :: term().
--type store_state() :: term().
--define(STATE_KEY, couch_rate_state).
--spec create_if_missing(couch_rate:id(), state()) -> store_state().
-create_if_missing(Id, State) ->
- case get({?STATE_KEY, Id}) of
- undefined ->
- put({?STATE_KEY, Id}, State),
- ok;
- _ ->
- ok
- end.
--spec new(couch_rate:id(), state()) ->
- store_state()
- | {error, term()}.
-new(Id, State) ->
- case get({?STATE_KEY, Id}) of
- undefined ->
- put({?STATE_KEY, Id}, State),
- ok;
- _ ->
- {error, #{reason => already_exists, id => Id}}
- end.
--spec lookup(id(), store_state()) ->
- state()
- | {error, term()}.
-lookup(Id, _StoreState) ->
- case get({?STATE_KEY, Id}) of
- undefined ->
- {error, #{reason => cannot_find, id => Id}};
- State ->
- State
- end.
--spec update(id(), store_state(), fun(
- (id(), state()) -> {state(), result()}
- )) ->
- result()
- | {error, term()}.
-update(Id, _StoreState, Fun) ->
- case get({?STATE_KEY, Id}) of
- undefined ->
- {error, #{reason => cannot_find, id => Id}};
- State ->
- case Fun(Id, State) of
- {Result, State} ->
- put({?STATE_KEY, Id}, State),
- Result;
- Error ->
- Error
- end
- end.
diff --git a/src/couch_rate/src/couch_rate_sup.erl b/src/couch_rate/src/couch_rate_sup.erl
deleted file mode 100644
index 1ce01b644..000000000
--- a/src/couch_rate/src/couch_rate_sup.erl
+++ /dev/null
@@ -1,36 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- start_link/0,
- init/1
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-init([]) ->
- couch_rate_ets:create_tables(),
- Flags = #{
- strategy => one_for_one,
- intensity => 5,
- period => 10
- },
- Children = [
- ],
- {ok, {Flags, Children}}. \ No newline at end of file
diff --git a/src/couch_rate/test/exunit/couch_rate_config_test.exs b/src/couch_rate/test/exunit/couch_rate_config_test.exs
deleted file mode 100644
index 7db30d272..000000000
--- a/src/couch_rate/test/exunit/couch_rate_config_test.exs
+++ /dev/null
@@ -1,88 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-defmodule Couch.Rate.Config.Test do
- use ExUnit.Case, async: true
- use ExUnitProperties
- import StreamData
- @erlang_reserved_words[
- "after",
- "and",
- "andalso",
- "band",
- "begin",
- "bnot",
- "bor",
- "bsl",
- "bsr",
- "bxor",
- "case",
- "catch",
- "cond",
- "div",
- "end",
- "fun",
- "if",
- "let",
- "not",
- "of",
- "or",
- "orelse",
- "receive",
- "rem",
- "try",
- "when",
- "xor"
- ])
- alias :couch_rate_config, as: RLC
- test "parse valid configuration" do
- parsed = RLC.from_str(~S(#{foo => 1, bar => 2.0}))
- assert %{foo: 1, bar: 2} == parsed
- end
- property "roundtrip" do
- check all(options <- valid_config()) do
- parsed = RLC.from_str(RLC.to_str(options))
- assert options == parsed
- end
- end
- defp valid_config() do
- map_of(
- erlang_atom(),
- one_of([
- positive_integer(),
- # we only support positive float
- float(min: 0.0)
- ])
- )
- end
- defp erlang_atom() do
- bind(string(:alphanumeric), fn str ->
- bind(integer(?a..?z), fn char ->
- erlang_atom(str, char)
- end)
- end)
- end
- defp erlang_atom(str, char) do
- if MapSet.member?(@erlang_reserved_words, <<char, str::binary>>) do
- String.to_atom(<<char, char, str::binary>>)
- else
- String.to_atom(<<char, str::binary>>)
- end
- end
diff --git a/src/couch_rate/test/exunit/couch_rate_limiter_test.exs b/src/couch_rate/test/exunit/couch_rate_limiter_test.exs
deleted file mode 100644
index ff70f793a..000000000
--- a/src/couch_rate/test/exunit/couch_rate_limiter_test.exs
+++ /dev/null
@@ -1,350 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-defmodule Couch.Rate.Limiter.Test do
- use ExUnit.Case, async: true
- @transaction_timeout 5_000
- alias :couch_rate, as: RL
- describe "Stats :" do
- @scenario %{rw_ratio: 1 / 1, target: 400, write_time: 100}
- test "#{__ENV__.line} : #{inspect(@scenario)} (underloaded)" do
- {rate_limiter, measurments} = simulate(@scenario, 1000)
- stats = statistics(measurments)
- maybe_debug(rate_limiter, measurments, stats)
- assert stats.wait_time.p90 == 100,
- "expected no artificial delays for more than 90% of batches"
- budget = stats.budget
- assert floor(budget.p95) in 1..7,
- "expected budget to converge into the 1..7 range (got #{budget.p95})"
- reads = stats.mean_reads
- assert floor(reads.p95) in 1..7,
- "expected mean_read to converge into the 1..7 range (got #{reads.p95})"
- writes = stats.mean_writes
- assert round(writes.p99) in 2..6
- "expected mean_writes to converge into the 2..6 range (got #{writes.p95})"
- assert stats.latency.p95 < @transaction_timeout,
- "expected latency for 95% batches under @transaction_timout"
- found_after = initial_search_speed(measurments)
- assert found_after < 5,
- "expected to find acceptable budget in less than 5 iterations (got: #{
- found_after
- })"
- measurments
- |> initial_search()
- |> Enum.reduce(101, fn row, prev_budget ->
- assert row.budget < prev_budget,
- "expected to reduce budget while we fail"
- row.budget
- end)
- end
- @scenario %{rw_ratio: 1 / 8, target: 3900, write_time: 100}
- test "#{__ENV__.line} : #{inspect(@scenario)} (optimal)" do
- {rate_limiter, measurments} = simulate(@scenario, 1000)
- stats = statistics(measurments)
- maybe_debug(rate_limiter, measurments, stats)
- assert stats.wait_time.p90 == 100,
- "expected no artificial delays for more than 90% of batches"
- budget = stats.budget
- assert floor(budget.p95) in 4..7,
- "expected budget to converge into the 4..7 range (got #{budget.p95})"
- reads = stats.mean_reads
- assert floor(reads.p95) in 4..7,
- "expected mean_read to converge into the 4..7 range (got #{reads.p95})"
- writes = stats.mean_writes
- assert round(writes.p99) in 39..41
- "expected mean_writes to converge into the 39..41 range (got #{writes.p95})"
- assert stats.latency.p95 < @transaction_timeout,
- "expected latency for 95% of batches under @transaction_timout"
- found_after = initial_search_speed(measurments)
- assert found_after < 10,
- "expected to find acceptable budget in less than 10 iterations (got: #{
- found_after
- })"
- measurments
- |> initial_search()
- |> Enum.reduce(101, fn row, prev_budget ->
- assert row.budget < prev_budget,
- "expected to reduce budget while we fail"
- row.budget
- end)
- end
- @scenario %{rw_ratio: 1 / 20, target: 3900, write_time: 100}
- test "#{__ENV__.line} : #{inspect(@scenario)} (overloaded)" do
- # This is a worst case scenario due to big variability of wait_time and
- # big value read/write ratio
- {rate_limiter, measurments} = simulate(@scenario, 1000)
- stats = statistics(measurments)
- maybe_debug(rate_limiter, measurments, stats)
- assert stats.wait_time.p90 == 100,
- "expected no artificial delays for more than 90% of batches"
- budget = stats.budget
- assert floor(budget.p95) in 1..4
- "expected budget to converge into the 1..4 range (got #{budget.p95})"
- reads = stats.mean_reads
- assert floor(reads.p95) in 1..4
- "expected mean_read to converge into the 1..4 range (got #{reads.p95})"
- writes = stats.mean_writes
- assert round(writes.p99) in 39..41
- "expected mean_writes to converge into the 39..41 range (got #{writes.p95})"
- assert stats.latency.p90 < @transaction_timeout,
- "expected latency for 90% of batches under @transaction_timout"
- found_after = initial_search_speed(measurments)
- assert found_after < 16,
- "expected to find acceptable budget in less than 16 iterations (got: #{
- found_after
- })"
- measurments
- |> initial_search()
- |> Enum.reduce(101, fn row, prev_budget ->
- assert row.budget < prev_budget,
- "expected to reduce budget while we fail"
- row.budget
- end)
- end
- end
- defp simulate(scenario, iterations) do
- :couch_rate_ets.create_tables()
- limiter =
-, :couch_rate_limiter, nil, %{
- budget: 100,
- target:,
- # average over 20 last measurments
- window: scenario.write_time * 20,
- sensitivity: scenario.write_time,
- timer: &timer/0
- })
- result =
- Enum.reduce(0..iterations, {limiter, []}, fn _idx, {limiter, stats} ->
- {budget, limiter} = step(limiter, scenario.rw_ratio, scenario.write_time)
- {limiter, update_measurments(limiter, stats, budget)}
- end)
- :couch_rate_ets.delete_tables()
- result
- end
- defp step(limiter, read_write_ratio, write_time) do
- {reads, limiter} = RL.budget(limiter)
- writes = round(reads / read_write_ratio)
- {delay, limiter} = RL.delay(limiter)
- sleep(delay)
- data_before = RL.to_map(limiter)
- {:ok, limiter} =, reads)
- data_after = RL.to_map(limiter)
- assert data_after.size <= data_after.window_size + 1,
- "The number of elements in minimums container shouldn't grow (got: #{
- data_after.size
- })"
- if data_before.writes == 0 and
- data_after.writes == 0 and
- data_before.reads != 0 do
- assert data_before.reads > data_after.reads,
- "expected to reduce number of reads while transaction fails"
- end
- total_write_time =
- 0..writes
- |> Enum.reduce_while(0, fn _, acc ->
- write_time = :rand.normal(write_time, write_time * 0.25)
- if acc < @transaction_timeout do
- {:cont, acc + write_time}
- else
- {:halt, acc}
- end
- end)
- sleep(total_write_time)
- if total_write_time < @transaction_timeout do
- {:ok, limiter} = RL.success(limiter, writes)
- {reads, limiter}
- else
- {:ok, limiter} = RL.failure(limiter)
- {reads, limiter}
- end
- end
- defp update_measurments(limiter, stats, budget) do
- data = RL.to_map(limiter)
- {wait_time, _} = RL.delay(limiter)
- stats ++
- [
- %{
- budget: budget,
- slack: - data.latency,
- rw_ratio: data.mean_reads / max(1, data.mean_writes),
- latency: data.latency,
- new_budget: budget,
- minimum_latency: RL.min_latency(limiter),
- wait_time: wait_time,
- elements_in_min_queue: data.size,
- mean_reads: data.mean_reads,
- mean_writes: data.mean_writes,
- total_reads: data.reads,
- total_writes: data.writes
- }
- ]
- end
- defp timer() do
- now = Process.get(:time, 1)
- Process.put(:time, now + 1)
- now
- end
- defp sleep(sleep_time_in_ms) do
- now = timer()
- Process.put(:time, now + sleep_time_in_ms - 1)
- end
- defp format_table([first | _] = rows) do
- spec =
- first
- |> Map.keys()
- |> h -> {h, String.length(to_str(h))} end)
- header = first |> Map.keys() |> |> Enum.join(" , ")
- lines =
-, fn row ->
- fields =
-, fn {field, size} ->
- String.pad_trailing("#{to_str(Map.get(row, field))}", size)
- end)
- Enum.join(fields, " , ")
- end)
- Enum.join([header | lines], "\n")
- end
- defp initial_search_speed(measurments) do
- length(initial_search(measurments))
- end
- defp initial_search(measurments) do
- Enum.reduce_while(measurments, [], fn row, acc ->
- if row.total_writes == 0 do
- {:cont, acc ++ [row]}
- else
- {:halt, acc}
- end
- end)
- end
- defp statistics(measurments) do
- data =
- Enum.reduce(measurments, %{}, fn row, acc ->
- Enum.reduce(row, acc, fn {key, value}, acc ->
- Map.update(acc, key, [], fn metric ->
- metric ++ [value]
- end)
- end)
- end)
- Enum.reduce(data, %{}, fn {key, values}, acc ->
- stats = Enum.into(:bear.get_statistics(values), %{})
- {percentile, stats} = Map.pop(stats, :percentile)
- stats =
- Enum.reduce(percentile, stats, fn {key, value}, acc ->
- Map.put(acc, String.to_atom("p#{to_str(key)}"), value)
- end)
- Map.put(acc, key, stats)
- end)
- end
- defp format_stats(stats) do
- rows =
-, fn {key, values} ->
- values
- |> Enum.into(%{})
- |> Map.put(:metric, key)
- |> Map.delete(:histogram)
- end)
- format_table(rows)
- end
- defp to_str(int) when is_integer(int) do
- "#{int}"
- end
- defp to_str(float) when is_float(float) do
- "#{Float.to_string(Float.round(float, 2))}"
- end
- defp to_str(atom) when is_atom(atom) do
- Atom.to_string(atom)
- end
- defp to_str(string) when is_binary(string) do
- string
- end
- defp to_map(rate_limiter) do
- RL.to_map(rate_limiter)
- end
- defp maybe_debug(rate_limiter, measurments, stats) do
- if System.fetch_env("EXUNIT_DEBUG") != :error do
- IO.puts("")
- IO.puts("rate_limiter: #{inspect(to_map(rate_limiter))}")
- IO.puts("measurments: #{inspect(measurments)}")
- IO.puts("stats: #{inspect(stats)}")
- IO.puts("\n" <> format_table(measurments) <> "\n" <> format_stats(stats))
- end
- end
diff --git a/src/couch_rate/test/exunit/test_helper.exs b/src/couch_rate/test/exunit/test_helper.exs
deleted file mode 100644
index 9b9d6ef94..000000000
--- a/src/couch_rate/test/exunit/test_helper.exs
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
diff --git a/src/couch_views/ b/src/couch_views/
index 5647913f0..09696da82 100644
--- a/src/couch_views/
+++ b/src/couch_views/
@@ -7,42 +7,12 @@ Currently only map indexes are supported and it will always return the full inde
Code layout:
* `couch_views` - Main entry point to query a view
-* `couch_views_reader` - Reads from the index for queries
+* `couch_views_encoding` - Encodes view keys that are byte comparable following CouchDB view sort order.
+* `couch_views_fdb` - Maps view operations to FoundationDB logic.
+* `couch_views_http` - View specific helpers for chttpd
* `couch_views_indexer` - `couch_jobs` worker that builds an index from the changes feed.
+* `couch_views_reader` - Reads from the index for queries
* `couch_vews_jobs` - `couch_views` interactions with `couch_jobs`. It handles adding index jobs and subscribes to jobs.
-* `couch_views_fdb` - Maps view operations to FoundationDB logic.
-* `couch_views_encoding` - Encodes view keys that are byte comparable following CouchDB view sort order.
* `couch_views_server` - Spawns `couch_views_indexer` workers to handle index update jobs.
-# Configuration
-## Configuring rate limiter
-Here is the example of configuration used in `couch_view` application:
-limiter = couch_rate_limiter
-opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000}
-Supported fields in `opts`:
-* `budget` - the initial value for estimated batch size
-* `target` - the amount in msec which we try to maintain for batch processing time
-* `window` - time interval for contention detector
-* `sensitivity` - minimal interval within the `window`
-Unsupported fields in `opts` (if you really know what you are doing):
-* `window_size` - how many batches to consider in contention detector
-* `timer` - this is used for testing to fast forward time `fun() -> current_time_in_ms() end`
-* `target` - the amount in msec which we try to maintain for batch processing time
-* `underload_threshold` - a threshold below which we would try to increase the budget
-* `overload_threshold` - a threshold above which we would start decreasing the budget
-* `delay_threshold` - a threshold above which we would start introducing delays between batches
-* `multiplicative_factor` - determines how fast we are going to decrease budget (must be in (0..1) range)
-* `regular_delay` - delay between batches when there is no overload
-* `congested_delay` - delay between batches when there is an overload
-* `initial_budget` - initial value for budget to start with
+* `couch_views_updater` - Update interactive indexes during doc update transactions
+* `couch_views_util` - Various utility functions
diff --git a/src/couch_views/src/ b/src/couch_views/src/
index cb8285ac2..985c503cd 100644
--- a/src/couch_views/src/
+++ b/src/couch_views/src/
@@ -28,7 +28,6 @@
- couch_eval,
- couch_rate
+ couch_eval
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 9c8be6fca..737b6f880 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -78,8 +78,6 @@ init() ->
fail_job(Job, Data, sig_changed, "Design document was modified")
- Limiter = couch_rate:create_if_missing({DbName, DDocId}, "views"),
State = #{
tx_db => undefined,
db_uuid => DbUUID,
@@ -90,8 +88,8 @@ init() ->
job => Job,
job_data => Data,
count => 0,
+ limit => num_changes(),
changes_done => 0,
- limiter => Limiter,
doc_acc => [],
design_opts => Mrst#mrst.design_opts
@@ -104,7 +102,6 @@ init() ->
error:database_does_not_exist ->
fail_job(Job, Data, db_deleted, "Database was deleted");
Error:Reason ->
- couch_rate:failure(Limiter),
NewRetry = Retries + 1,
RetryLimit = retry_limit(),
@@ -165,48 +162,27 @@ add_error(Error, Reason, Data) ->
update(#{} = Db, Mrst0, State0) ->
- Limiter = maps:get(limiter, State0),
- case couch_rate:budget(Limiter) of
- 0 ->
- couch_rate:wait(Limiter),
- update(Db, Mrst0, State0);
- Limit ->
- {Mrst1, State1} = do_update(Db, Mrst0, State0#{limit => Limit, limiter => Limiter}),
- case State1 of
- finished ->
- couch_eval:release_map_context(Mrst1#mrst.qserver);
- _ ->
- couch_rate:wait(Limiter),
- update(Db, Mrst1, State1)
- end
- end.
-do_update(Db, Mrst0, State0) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
+ {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
State1 = get_update_start_state(TxDb, Mrst0, State0),
{ok, State2} = fold_changes(State1),
count := Count,
+ limit := Limit,
doc_acc := DocAcc,
last_seq := LastSeq,
- limit := Limit,
- limiter := Limiter,
view_vs := ViewVS,
changes_done := ChangesDone0,
design_opts := DesignOpts
} = State2,
DocAcc1 = fetch_docs(TxDb, DesignOpts, DocAcc),
- couch_rate:in(Limiter, Count),
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
- WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2),
- ChangesDone = ChangesDone0 + WrittenDocs,
+ write_docs(TxDb, Mrst1, MappedDocs, State2),
- couch_rate:success(Limiter, WrittenDocs),
+ ChangesDone = ChangesDone0 + length(DocAcc),
case Count < Limit of
true ->
@@ -225,7 +201,14 @@ do_update(Db, Mrst0, State0) ->
view_seq := LastSeq
- end).
+ end),
+ case State4 of
+ finished ->
+ couch_eval:release_map_context(Mrst2#mrst.qserver);
+ _ ->
+ update(Db, Mrst2, State4)
+ end.
maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
@@ -368,16 +351,14 @@ write_docs(TxDb, Mrst, Docs, State) ->
KeyLimit = key_size_limit(),
ValLimit = value_size_limit(),
- DocsNumber = lists:foldl(fun(Doc0, N) ->
+ lists:foreach(fun(Doc0) ->
Doc1 = calculate_kv_sizes(Mrst, Doc0, KeyLimit, ValLimit),
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1),
- N + 1
- end, 0, Docs),
+ couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1)
+ end, Docs),
if LastSeq == false -> ok; true ->
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
- end,
- DocsNumber.
+ end.
fetch_docs(Db, DesignOpts, Changes) ->
@@ -563,6 +544,10 @@ fail_job(Job, Data, Error, Reason) ->
+num_changes() ->
+ config:get_integer("couch_views", "change_limit", 100).
retry_limit() ->
config:get_integer("couch_views", "retry_limit", 3).
@@ -572,4 +557,4 @@ key_size_limit() ->
value_size_limit() ->
- config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT). \ No newline at end of file
+ config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT).
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index cb8378f01..cff3a2e54 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -52,7 +52,6 @@ indexer_test_() ->
- ?TDEF_FE(index_budget_is_changing),
?TDEF_FE(index_can_recover_from_crash, 60)
@@ -378,55 +377,6 @@ index_autoupdater_callback(Db) ->
?assertEqual(ok, couch_views_jobs:wait_for_job(JobId,, DbSeq)).
-index_budget_is_changing(Db) ->
- ok = meck:new(couch_rate, [passthrough]),
- ok = meck:expect(couch_rate, budget, fun(State) ->
- meck:passthrough([State])
- end),
- LimiterOpts = #{
- budget => 100,
- sensitivity => 500,
- target => 500,
- timer => fun timer/0,
- window => 2000
- },
- ok = meck:expect(couch_rate, create_if_missing, fun(Id, Module, Store, _Options) ->
- meck:passthrough([Id, Module, Store, LimiterOpts])
- end),
- ok = meck:expect(couch_rate, wait, fun(State) ->
- Delay = couch_rate:delay(State),
- put(time, timer() + Delay - 1)
- end),
- DDoc = create_ddoc(),
- Docs = lists:map(fun doc/1, lists:seq(1, 200)),
- {ok, _} = fabric2_db:update_docs(Db, [DDoc | Docs], []),
- {ok, _Out} = couch_views:query(
- Db,
- DDoc,
- <<"map_fun2">>,
- fun fold_fun/2,
- [],
- #mrargs{}
- ),
- ?assert(length(lists:usort(budget_history())) > 1).
-timer() ->
- get(time) == undefined andalso put(time, 1),
- Now = get(time),
- put(time, Now + 1),
- Now.
-budget_history() ->
- [Result || {_Pid, {couch_rate, budget, _}, Result} <- meck:history(couch_rate)].
multiple_design_docs(Db) ->
Cleanup = fun() ->
@@ -487,8 +437,7 @@ handle_db_recreated_when_running(Db) ->
% To intercept job building while it is running ensure updates happen one
% row at a time.
- ok = meck:new(couch_rate, [passthrough]),
- ok = meck:expect(couch_rate, budget, ['_'], meck:val(1)),
+ config:set("couch_view", "change_limit", "1", false),
diff --git a/src/couch_views/test/couch_views_server_test.erl b/src/couch_views/test/couch_views_server_test.erl
index 23c807cc2..3c0c0a86a 100644
--- a/src/couch_views/test/couch_views_server_test.erl
+++ b/src/couch_views/test/couch_views_server_test.erl
@@ -45,7 +45,6 @@ setup() ->
Ctx = test_util:start_couch([
- couch_rate,
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index f5ea37982..03c21a34a 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -51,7 +51,7 @@ indexer_test_() ->
setup() ->
- test_util:start_couch([fabric, couch_js, couch_rate]).
+ test_util:start_couch([fabric, couch_js]).
cleanup(Ctx) ->