summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorILYA Khlopotov <iilyak@apache.org>2020-02-13 12:23:42 -0800
committerILYA Khlopotov <iilyak@apache.org>2020-04-02 03:41:40 -0700
commit85f81d88018fe526f1f216d673a9bbc847cbd81c (patch)
tree823146246a5af92fdc3d3382d1ea7baea2f7386a
parent4707af418eefe2988a061c53e122c774c52c339a (diff)
downloadcouchdb-85f81d88018fe526f1f216d673a9bbc847cbd81c.tar.gz
Use `couch_rate` application for `couch_view`
-rw-r--r--.credo.exs1
-rw-r--r--.gitignore1
-rw-r--r--Makefile1
-rw-r--r--mix.exs8
-rw-r--r--mix.lock1
-rw-r--r--rebar.config.script1
-rw-r--r--rel/files/eunit.ini3
-rw-r--r--rel/overlay/etc/default.ini4
-rw-r--r--src/couch_rate/README.md155
-rw-r--r--src/couch_rate/src/couch_rate.app.src24
-rw-r--r--src/couch_rate/src/couch_rate.erl318
-rw-r--r--src/couch_rate/src/couch_rate.hrl19
-rw-r--r--src/couch_rate/src/couch_rate_app.erl28
-rw-r--r--src/couch_rate/src/couch_rate_config.erl66
-rw-r--r--src/couch_rate/src/couch_rate_ets.erl119
-rw-r--r--src/couch_rate/src/couch_rate_limiter.erl387
-rw-r--r--src/couch_rate/src/couch_rate_pd.erl90
-rw-r--r--src/couch_rate/src/couch_rate_sup.erl36
-rw-r--r--src/couch_rate/test/exunit/couch_rate_config_test.exs88
-rw-r--r--src/couch_rate/test/exunit/couch_rate_limiter_test.exs350
-rw-r--r--src/couch_rate/test/exunit/test_helper.exs14
-rw-r--r--src/couch_views/README.md33
-rw-r--r--src/couch_views/src/couch_views.app.src3
-rw-r--r--src/couch_views/src/couch_views_indexer.erl60
-rw-r--r--src/couch_views/test/couch_views_indexer_test.erl55
-rw-r--r--src/couch_views/test/couch_views_trace_index_test.erl2
26 files changed, 1839 insertions, 28 deletions
diff --git a/.credo.exs b/.credo.exs
index bd26f407c..112561b95 100644
--- a/.credo.exs
+++ b/.credo.exs
@@ -37,6 +37,7 @@
~r"/src/metrics",
~r"/src/minerl",
~r"/src/parse_trans",
+ ~r"/src/stream_data",
~r"/src/ssl_verify_fun",
~r"/test/elixir/deps/"
]
diff --git a/.gitignore b/.gitignore
index bf45d1a4f..955403a98 100644
--- a/.gitignore
+++ b/.gitignore
@@ -76,6 +76,7 @@ src/rebar/
src/recon/
src/smoosh/
src/snappy/
+src/stream_data/
src/ssl_verify_fun/
src/thrift_protocol/
src/triq/
diff --git a/Makefile b/Makefile
index 2f5df90b9..b3eb64c99 100644
--- a/Makefile
+++ b/Makefile
@@ -162,6 +162,7 @@ endif
check-fdb:
make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric
make elixir tests=test/elixir/test/basics_test.exs,test/elixir/test/replication_test.exs,test/elixir/test/map_test.exs,test/elixir/test/all_docs_test.exs,test/elixir/test/bulk_docs_test.exs
+ make exunit tests=src/couch_rate/test/exunit/
.PHONY: eunit
# target: eunit - Run EUnit tests, use EUNIT_OPTS to provide custom options
diff --git a/mix.exs b/mix.exs
index 29c81fa49..480d426b1 100644
--- a/mix.exs
+++ b/mix.exs
@@ -49,11 +49,14 @@ defmodule CouchDBTest.Mixfile do
# Run "mix help compile.app" to learn about applications.
def application do
[
- extra_applications: [:logger],
+ extra_applications: extra_applications(Mix.env()),
applications: [:httpotion]
]
end
+ defp extra_applications(:test), do: [:logger, :stream_data]
+ defp extra_applications(_), do: [:logger]
+
# Specifies which paths to compile per environment.
defp elixirc_paths(:test), do: ["test/elixir/lib", "test/elixir/test/support"]
defp elixirc_paths(:integration), do: ["test/elixir/lib", "test/elixir/test/support"]
@@ -68,7 +71,8 @@ defmodule CouchDBTest.Mixfile do
{:jiffy, path: Path.expand("src/jiffy", __DIR__)},
{:ibrowse,
path: Path.expand("src/ibrowse", __DIR__), override: true, compile: false},
- {:credo, "~> 1.2.0", only: [:dev, :test, :integration], runtime: false}
+ {:credo, "~> 1.2.0", only: [:dev, :test, :integration], runtime: false},
+ {:stream_data, "~> 0.4.3", only: [:dev, :test, :integration], runtime: false}
]
end
diff --git a/mix.lock b/mix.lock
index c03e11f64..7a155c6bb 100644
--- a/mix.lock
+++ b/mix.lock
@@ -14,5 +14,6 @@
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.5", "6eaf7ad16cb568bb01753dbbd7a95ff8b91c7979482b95f38443fe2c8852a79b", [:make, :mix, :rebar3], [], "hexpm", "13104d7897e38ed7f044c4de953a6c28597d1c952075eb2e328bc6d6f2bfc496"},
+ "stream_data": {:hex, :stream_data, "0.4.3", "62aafd870caff0849a5057a7ec270fad0eb86889f4d433b937d996de99e3db25", [:mix], [], "hexpm", "7dafd5a801f0bc897f74fcd414651632b77ca367a7ae4568778191fc3bf3a19a"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
}
diff --git a/rebar.config.script b/rebar.config.script
index 269511d8e..6f9f65c73 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -123,6 +123,7 @@ SubDirs = [
"src/couch_index",
"src/couch_mrview",
"src/couch_js",
+ "src/couch_rate",
"src/couch_replicator",
"src/couch_plugins",
"src/couch_pse_tests",
diff --git a/rel/files/eunit.ini b/rel/files/eunit.ini
index 2b73ab307..20277f288 100644
--- a/rel/files/eunit.ini
+++ b/rel/files/eunit.ini
@@ -40,3 +40,6 @@ startup_jitter = 0
[fabric]
; disable index auto-updater to avoid interfering with some of the tests
index_updater_enabled = false
+
+[couch_rate.views]
+opts = #{budget => 100, target => 500, window => 6000, sensitivity => 200, congested_delay => 1} \ No newline at end of file
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 749cdd27f..fd0aa7763 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -699,3 +699,7 @@ compaction = false
; log every generated trace by including the following:
;
; all = (#{}) -> true
+
+[couch_rate.views]
+limiter = couch_rate_limiter
+opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000} \ No newline at end of file
diff --git a/src/couch_rate/README.md b/src/couch_rate/README.md
new file mode 100644
index 000000000..530da1a99
--- /dev/null
+++ b/src/couch_rate/README.md
@@ -0,0 +1,155 @@
+# Description
+
+The `couch_rate` application implements a generic rate limiter which can
+be used to control batch size and delay between batches. It was initially
+designed for background index build to find an optimal batch size to utilize
+the FDB transaction up to configured `target` parameter. The application
+provides an API to plug custom rate limiting logic when need to.
+
+# Default rate limit logic
+
+The `couch_rate_limiter` is the rate limit module used by default.
+The module tracks average number of reads and writes over specified
+time period. It uses average read/write numbers to calculate an
+approximate value for read/write ratio. Then the read/write ratio is
+used to convert estimated amount of writes into batch size.
+
+# Configuration
+
+## API based usage
+
+In the simplest use case the only mandatory keys `new/3` expects are:
+* `budget` - the initial value for estimated batch size
+* `target` - the amount in msec which we try to maintain for batch processing time
+* `window` - time interval for contention detector
+* `sensitivity` - minimal interval within the `window`
+
+We choose sane default values for the rest of the parameters.
+
+* `window_size = window div sensitivity + 1`
+* `underload_threshold = round(target * 0.95)`
+* `overload_threshold = round(target * 1.05)`
+* `delay_threshold = round(target * 1.07)`
+
+Due to the use of `round` in defaults calculation the `target` cannot be less
+than `36` msec. Otherwise some of the thresholds become equal which breaks the
+algorithm.
+
+In the case when you need to specify custom parameters, the following keys
+are supported:
+
+* `window_size` - how many batches to consider in contention detector
+* `timer` - this is used for testing to fast forward time `fun() -> current_time_in_ms() end`
+* `target` - the amount in msec which we try to maintain for batch processing time
+* `underload_threshold` - a threshold bellow which we would try to increase the budget
+* `overload_threshold` - a threshold above which we would start decreasing the budget
+* `delay_threshold` - a threshold above which we would start introducing delays between batches
+* `multiplicative_factor` - determines how fast we are going to decrease budget (must be in (0..1) range)
+* `regular_delay` - delay between batches when there is no overload
+* `congested_delay` - delay between batches when there is an overload
+* `initial_budget` - initial value for budget to start with
+
+## default.ini based usage
+
+The users of the `couch_rate` application pass the `ConfigId` parameter.
+When calling `couch_rate:new` and `couch_rate:create_if_missing`.
+The `couch_rate` application uses this information to construct name of the
+configuration section to use to get configuration parameters. The configration
+section is constructed using `"couch_rate." ++ ConfigId`.
+The parameters are encoded using erlang map syntax.
+Limitation of the map parser:
+
+* Keys must be atoms
+* Values are either integers or floats
+* We only support positive values in the map
+* Configuration object cannot use erlang reserved words in keys:
+ `after`, `and`, `andalso`, `band`, `begin`, `bnot`, `bor`,
+ `bsl`, `bsr`, `bxor`, `case`, `catch`, `cond`, `div`, `end`
+ `fun`, `if`, `let`, `not`, `of`, `or`, `orelse`, `receive`
+ `rem`, `try`, `when`, `xor`
+
+The auxilary `couch_rate_config` module implements the following API:
+
+* `couch_rate_config:from_str/1` - parses a string representation of parameters
+* `couch_rate_config:to_str/1` - converts parameters to string (used in testing)
+
+Here is the example of configuration used in `couch_view` application:
+
+```
+[couch_rate.views]
+limiter = couch_rate_limiter
+opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000}
+```
+
+In the `couch_view` application it is used as follows:
+
+```
+Limiter = couch_rate:create_if_missing({DbName, DDocId}, "views"),
+```
+
+# API
+
+The application implements two APIs. Both APIs are supported by `couch_rate`
+module. The API variants are:
+
+* explicit state passing
+* state store based approach
+
+The API is chosen baed on the `StoreModule` argument passed to `new/4`.
+Currently we support following values for `StoreModule`:
+
+* `nil` - this value indicates that explicit state passing would be used
+* `couch_rate_ets` - ets based global state store (ets tables are owned by app supervisor)
+* `couch_rate_pd` - process dicionary based local state store
+
+The "explicit state passing" style returns a tuple `{Result :: term(), state()}`.
+The result is the same as for state store based API.
+
+
+## State store based APIs of `couch_rate` module.
+
+All functions can return `{error, Reason :: term()}` in case of errors.
+This detail is ommited bellow.
+
+* `create_if_missing(Id :: id(), Module :: module(), Store :: module(), Options :: map()) -> limiter()` - create new rate limiter instance
+* `new(Id :: id(), Module :: module(), Store :: module(), Options :: map()) -> limiter()` - create new rate limiter instance
+* `budget(limiter()) -> Budget :: integer().` - get batch size
+* `delay(limiter()) -> Delay :: timeout().` - return delay in msec between batches
+* `wait(limiter()) -> ok` - block the caller for amount of time returned by `delay/1`
+* `in(limiter(), Reads :: integer()) -> limiter()` - notify rate limiter on the amount of reads were actually done (could be less than `budget`)
+* `success(limiter(), Writes :: integer()) -> limiter()` - how many writes happen
+* `failure(limiter()) -> limiter()` - called instead of `success/2` when failure happen
+* `is_congestion(limiter()) -> boolean()` - returns `false` when congestion is detected
+* `format(limiter()) -> [{Key :: atom(), Value :: term()}]` - return key value list representing important aspects of the limiter state
+* `id(limitter()) -> id()` - returns `id()` of the rate limiter
+* `module(limiter()) -> module()` - returns callback module implementing rate limiting logic.
+* `state(limiter()) -> state()` - returns internal state of rate limiter.
+* `store(limiter()) -> module() | nil` - returns store state backend.
+
+# Testing
+
+The test suite is written in Elixir.
+
+## Running all tests
+
+```
+make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/
+```
+
+## Running specific test suite
+
+```
+make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs
+```
+
+## Running specific test using line number
+
+```
+make couch && ERL_LIBS=`pwd`/src mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs:10
+```
+
+## Running traces with stats output
+
+```
+make couch && ERL_LIBS=`pwd`/src EXUNIT_DEBUG=true mix test --trace src/couch_rate/test/exunit/couch_rate_limiter_test.exs
+``` \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate.app.src b/src/couch_rate/src/couch_rate.app.src
new file mode 100644
index 000000000..ed6de81d6
--- /dev/null
+++ b/src/couch_rate/src/couch_rate.app.src
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+ {application, couch_rate, [
+ {description, "Simple rate limiter"},
+ {vsn, git},
+ {registered, [
+ ]},
+ {applications, [
+ kernel,
+ stdlib,
+ syntax_tools
+ ]},
+ {mod, {couch_rate_app, []}}
+]}.
diff --git a/src/couch_rate/src/couch_rate.erl b/src/couch_rate/src/couch_rate.erl
new file mode 100644
index 000000000..24bbcc2a5
--- /dev/null
+++ b/src/couch_rate/src/couch_rate.erl
@@ -0,0 +1,318 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate).
+
+-include("couch_rate.hrl").
+
+-export([
+ create_if_missing/2,
+ create_if_missing/3,
+ create_if_missing/4,
+ new/2,
+ new/3,
+ new/4,
+ from_map/4,
+ budget/1,
+ delay/1,
+ wait/1,
+ in/2,
+ success/2,
+ failure/1,
+ is_congestion/1,
+ min_latency/1,
+ format/1,
+ to_map/1,
+ id/1,
+ module/1,
+ state/1,
+ store/1
+]).
+
+-define(LIMITER, ?MODULE).
+
+-type id() :: term().
+-type state() :: term().
+-type store() :: module().
+
+-opaque limiter() :: #?LIMITER{}.
+
+-export_type([
+ id/0,
+ state/0,
+ limiter/0
+]).
+
+-spec create_if_missing(id(), string()) ->
+ couch_rate:limiter() | {error, Reason :: term()}.
+
+create_if_missing(Id, ConfigId) ->
+ ?MODULE:create_if_missing(Id, ConfigId, couch_rate_ets).
+
+-spec create_if_missing(id(), string(), nil | module()) ->
+ couch_rate:limiter() | {error, Reason :: term()}.
+
+create_if_missing(Id, ConfigId, StateStore) ->
+ {Module, Options} = get_config(ConfigId),
+ ?MODULE:create_if_missing(Id, Module, StateStore, Options).
+
+-spec create_if_missing(id(), module(), nil | module(), map()) ->
+ couch_rate:limiter() | {error, Reason :: term()}.
+
+create_if_missing(Id, Module, nil, Options) ->
+ #?LIMITER{
+ id = Id,
+ module = Module,
+ store = nil,
+ state = Module:new(Id, Options)
+ };
+
+create_if_missing(Id, Module, Store, Options) ->
+ case Store:create_if_missing(Id, Module:new(Id, Options)) of
+ {error, _} = Error ->
+ Error;
+ State ->
+ #?LIMITER{
+ id = Id,
+ module = Module,
+ store = Store,
+ state = State
+ }
+ end.
+
+
+-spec new(id(), string()) ->
+ couch_rate:limiter() | {error, Reason :: term()}.
+
+new(Id, ConfigId) ->
+ ?MODULE:new(Id, ConfigId, couch_rate_ets).
+
+-spec new(id(), string(), module()) ->
+ couch_rate:limiter() | {error, Reason :: term()}.
+
+new(Id, ConfigId, StateStore) ->
+ {Module, Options} = get_config(ConfigId),
+ ?MODULE:new(Id, Module, StateStore, Options).
+
+
+-spec new(id(), module(), nil | module(), map()) ->
+ couch_rate:limiter() | {error, Reason :: term()}.
+
+new(Id, Module, nil, Options) ->
+ #?LIMITER{
+ id = Id,
+ module = Module,
+ store = nil,
+ state = Module:new(Id, Options)
+ };
+
+new(Id, Module, Store, Options) ->
+ case Store:new(Id, Module:new(Id, Options)) of
+ {error, _} = Error ->
+ Error;
+ State ->
+ #?LIMITER{
+ id = Id,
+ module = Module,
+ store = Store,
+ state = State
+ }
+ end.
+
+
+-spec from_map(id(), module(), store(), map()) ->
+ couch_rate:limiter()
+ | {error, Reason :: term()}.
+
+from_map(Id, Module, nil, Map) ->
+ #?LIMITER{
+ id = Id,
+ module = Module,
+ store = nil,
+ state = Module:from_map(Map)
+ };
+
+from_map(Id, Module, Store, Map) ->
+ case Store:new(Id, Module:from_map(Map)) of
+ {error, _} = Error ->
+ Error;
+ State ->
+ #?LIMITER{
+ id = Id,
+ module = Module,
+ store = Store,
+ state = State
+ }
+ end.
+
+
+-spec update(limiter(), (
+ fun(
+ (id(), state()) ->
+ {Result :: term(), state()}
+ | {error, Reason :: term()}
+ )
+ )) ->
+ Result :: term()
+ | {Result :: term(), state()}
+ | {error, Reason :: term()}.
+
+update(#?LIMITER{store = nil, id = Id, state = State0} = Limiter, Fun) ->
+ case Fun(Id, State0) of
+ {error, _Reason} = Error ->
+ Error;
+ {Result, State1} ->
+ {Result, Limiter#?LIMITER{state = State1}}
+ end;
+
+update(#?LIMITER{id = Id, store = Store, state = State}, Fun) ->
+ Store:update(Id, State, Fun).
+
+
+-spec budget(limiter()) ->
+ Budget :: integer()
+ | {Budget :: integer(), limiter()}
+ | {error, term()}.
+
+budget(#?LIMITER{module = Module} = Limiter) ->
+ update(Limiter, fun(Id, StateIn) ->
+ Module:budget(Id, StateIn)
+ end).
+
+
+-spec delay(limiter()) ->
+ DelayTime :: integer()
+ | {DelayTime :: integer(), limiter()}
+ | {error, term()}.
+
+delay(#?LIMITER{module = Module} = Limiter) ->
+ update(Limiter, fun(Id, State) ->
+ Module:delay(Id, State)
+ end).
+
+
+-spec wait(limiter()) ->
+ ok
+ | {ok, limiter()}
+ | {error, term()}.
+
+wait(#?LIMITER{module = Module} = Limiter) ->
+ update(Limiter, fun(Id, State) ->
+ Module:wait(Id, State)
+ end).
+
+
+-spec in(limiter(), integer()) ->
+ ok
+ | {ok, limiter()}
+ | {error, term()}.
+
+in(#?LIMITER{module = Module} = Limiter, Reads) ->
+ update(Limiter, fun(Id, State) ->
+ Module:in(Id, State, Reads)
+ end).
+
+
+-spec success(limiter(), integer()) ->
+ ok
+ | limiter()
+ | {error, term()}.
+
+success(#?LIMITER{module = Module} = Limiter, Writes) ->
+ update(Limiter, fun(Id, State) ->
+ Module:success(Id, State, Writes)
+ end).
+
+
+-spec failure(limiter()) ->
+ ok
+ | limiter()
+ | {error, term()}.
+
+failure(#?LIMITER{module = Module} = Limiter) ->
+ update(Limiter, fun(Id, State) ->
+ Module:failure(Id, State)
+ end).
+
+
+-spec is_congestion(limiter()) -> boolean().
+
+is_congestion(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
+ Module:is_congestion(Id, State);
+
+is_congestion(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
+ Module:is_congestion(Id, Store:lookup(Id, State)).
+
+
+-spec format(limiter()) -> [{Key :: atom(), Value :: term()}].
+
+format(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
+ Module:format(Id, State);
+
+format(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
+ Module:format(Id, Store:lookup(Id, State)).
+
+
+-spec to_map(limiter()) -> map().
+
+to_map(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
+ Module:to_map(Id, State);
+
+to_map(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
+ Module:to_map(Id, Store:lookup(Id, State)).
+
+-spec min_latency(limiter()) -> pos_integer().
+
+min_latency(#?LIMITER{store = nil, module = Module, id = Id, state = State}) ->
+ Module:min_latency(Id, State);
+
+min_latency(#?LIMITER{store = Store, module = Module, id = Id, state = State}) ->
+ Module:to_map(Id, Store:lookup(Id, State)).
+
+
+-spec id(limiter()) -> module().
+
+id(Limiter) ->
+ Limiter#?LIMITER.id.
+
+
+-spec module(limiter()) -> module().
+
+module(Limiter) ->
+ Limiter#?LIMITER.module.
+
+
+-spec state(limiter()) -> state().
+
+state(Limiter) ->
+ Limiter#?LIMITER.state.
+
+-spec store(limiter()) -> module() | nil.
+
+store(Limiter) ->
+ Limiter#?LIMITER.store.
+
+
+get_config(ConfigId) ->
+ ConfigSection = "couch_rate." ++ ConfigId,
+ ModuleStr = config:get(ConfigSection, "limiter", "couch_rate_limiter"),
+ Module = list_to_existing_atom(ModuleStr),
+ case config:get(ConfigSection, "opts", undefined) of
+ undefined ->
+ {error, #{missing_key => "opts", in => ConfigSection}};
+ OptionsStr ->
+ Options = couch_rate_config:from_str(OptionsStr),
+ lists:map(fun(Key) ->
+ maps:is_key(Key, Options) orelse error(#{missing_key => Key, in => Options})
+ end, [budget, target, window, sensitivity]),
+ {Module, Options}
+ end.
diff --git a/src/couch_rate/src/couch_rate.hrl b/src/couch_rate/src/couch_rate.hrl
new file mode 100644
index 000000000..d19f7d8e4
--- /dev/null
+++ b/src/couch_rate/src/couch_rate.hrl
@@ -0,0 +1,19 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(couch_rate,
+ {
+ id :: couch_rate:id(),
+ module = couch_rate_limiter :: module(),
+ store = couch_rate_ets :: module() | nil,
+ state :: couch_rate:state()
+ }).
diff --git a/src/couch_rate/src/couch_rate_app.erl b/src/couch_rate/src/couch_rate_app.erl
new file mode 100644
index 000000000..2bb1621c3
--- /dev/null
+++ b/src/couch_rate/src/couch_rate_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate_app).
+
+-behaviour(application).
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+ couch_rate_sup:start_link().
+
+
+stop(_State) ->
+ ok.
diff --git a/src/couch_rate/src/couch_rate_config.erl b/src/couch_rate/src/couch_rate_config.erl
new file mode 100644
index 000000000..709fbc3d3
--- /dev/null
+++ b/src/couch_rate/src/couch_rate_config.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate_config).
+
+% This parser supports only maps where key is atom and value
+% is positive float or positive integer.
+
+-include_lib("syntax_tools/include/merl.hrl").
+
+-export([
+ from_str/1,
+ to_str/1
+]).
+
+from_str(String) ->
+ parse_map(merl:quote(String)).
+
+
+to_str(Map) when is_map(Map) ->
+ StringArgs = maps:fold(fun(Key, Val, Acc) ->
+ Acc ++ [atom_to_list(Key) ++ " => " ++ number_to_list(Val)]
+ end, [], Map),
+ "#{" ++ string:join(StringArgs, ", ") ++ "}".
+
+
+number_to_list(Int) when is_integer(Int) ->
+ integer_to_list(Int);
+
+number_to_list(Float) when is_float(Float) ->
+ float_to_list(Float).
+
+
+parse_map(MapAST) ->
+ erl_syntax:type(MapAST) == map_expr
+ orelse fail("Only #{field => pos_integer() | float()} syntax is supported"),
+ %% Parsing map manually, since merl does not support maps
+ lists:foldl(fun(AST, Bindings) ->
+ NameAST = erl_syntax:map_field_assoc_name(AST),
+ erl_syntax:type(NameAST) == atom
+ orelse fail("Only atoms are supported as field names"),
+ Name = erl_syntax:atom_value(NameAST),
+ ValueAST = erl_syntax:map_field_assoc_value(AST),
+ Value = case erl_syntax:type(ValueAST) of
+ integer ->
+ erl_syntax:integer_value(ValueAST);
+ float ->
+ erl_syntax:float_value(ValueAST);
+ _ ->
+ fail("Only pos_integer() or float() alowed as values")
+ end,
+ Bindings#{Name => Value}
+ end, #{}, erl_syntax:map_expr_fields(MapAST)).
+
+
+fail(Msg) ->
+ throw({error, Msg}). \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate_ets.erl b/src/couch_rate/src/couch_rate_ets.erl
new file mode 100644
index 000000000..edd9d965c
--- /dev/null
+++ b/src/couch_rate/src/couch_rate_ets.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate_ets).
+
+-include("couch_rate.hrl").
+
+-export([
+ create_tables/0,
+ delete_tables/0,
+ create_if_missing/2,
+ new/2,
+ lookup/2,
+ update/3
+]).
+
+
+-define(SHARDS_N, 16).
+
+-type id() :: term().
+-type state() :: term().
+-type result() :: term().
+-type store_state() :: term().
+
+
+-spec create_if_missing(couch_rate:id(), state()) ->
+ store_state().
+
+create_if_missing(Id, State) ->
+ Tid = term_to_table(Id),
+ case ets:lookup(Tid, Id) of
+ [_ | _] -> ok;
+ _ -> ets:insert(Tid, {Id, State})
+ end,
+ ok.
+
+
+-spec new(couch_rate:id(), state()) ->
+ store_state()
+ | {error, term()}.
+
+new(Id, State) ->
+ Tid = term_to_table(Id),
+ case ets:insert_new(Tid, {Id, State}) of
+ true -> ok;
+ false -> {error, #{reason => already_exists, id => Id}}
+ end.
+
+
+-spec update(id(), store_state(), fun(
+ (id(), state()) -> {state(), result()}
+ )) ->
+ result()
+ | {error, term()}.
+
+update(Id, _StoreState, Fun) ->
+ Tid = term_to_table(Id),
+ case ets:lookup(Tid, Id) of
+ [{Id, State0}] ->
+ case Fun(Id, State0) of
+ {Result, State1} ->
+ ets:insert(Tid, {Id, State1}),
+ Result;
+ Error ->
+ Error
+ end;
+ _ ->
+ {error, #{reason => cannot_find, id => Id}}
+ end.
+
+
+-spec lookup(id(), store_state()) ->
+ state()
+ | {error, term()}.
+
+lookup(Id, _StoreState) ->
+ Tid = term_to_table(Id),
+ case ets:lookup(Tid, Id) of
+ [{Id, State}] ->
+ State;
+ _ ->
+ {error, #{reason => cannot_find, id => Id}}
+ end.
+
+
+create_tables() ->
+ Opts = [named_table, public, {read_concurrency, true}],
+ [ets:new(TableName, Opts) || TableName <- table_names()],
+ ok.
+
+delete_tables() ->
+ [ets:delete(TableName) || TableName <- table_names()],
+ ok.
+
+
+-spec term_to_table(any()) -> atom().
+term_to_table(Term) ->
+ PHash = erlang:phash2(Term),
+ table_name(PHash rem ?SHARDS_N).
+
+
+-dialyzer({no_return, table_names/0}).
+
+-spec table_names() -> [atom()].
+table_names() ->
+ [table_name(N) || N <- lists:seq(0, ?SHARDS_N - 1)].
+
+-spec table_name(non_neg_integer()) -> atom().
+table_name(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS_N ->
+ list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id)). \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate_limiter.erl b/src/couch_rate/src/couch_rate_limiter.erl
new file mode 100644
index 000000000..349da8d5a
--- /dev/null
+++ b/src/couch_rate/src/couch_rate_limiter.erl
@@ -0,0 +1,387 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate_limiter).
+
+%% This module implements an algorithm to control the consumption rate
+%% parameters such as:
+%% - batch size
+%% - delay between batches
+%% The components of the algorithm use
+%% - [ascending minima algorithm](http://web.archive.org/web/20120805114719/http://home.tiac.net/~cri/2001/slidingmin.html)
+%% - "Welford's method" of calculating average
+
+-export([
+ new/2,
+ from_map/2,
+ budget/2,
+ delay/2,
+ wait/2,
+ in/3,
+ success/3,
+ failure/2,
+ is_congestion/2,
+ min_latency/2,
+ format/2,
+ to_map/2
+]).
+
+-type msec() :: non_neg_integer().
+
+-define(STATE, ?MODULE).
+
+%% This is the number below which the math would not work due to round errors
+%% In particular the default values for thresholds would be equal
+-define(MIN_TARGET, 36).
+
+-define(record_to_keyval(Name, Record),
+ lists:zip(record_info(fields, Name),
+ tl(tuple_to_list(Record)))).
+
+-define(map_to_record(RecordName, Map),
+ element(1, lists:foldl(fun(Field, {Record, Idx}) ->
+ {setelement(Idx, Record, maps:get(Field, Map, element(Idx, Record))), Idx + 1}
+ end, {#RecordName{}, 2}, record_info(fields, RecordName)))).
+
+
+-define(record_to_map(RecordName, Record),
+ element(1, lists:foldl(fun(Field, {Map, Idx}) ->
+ {
+ maps:put(Field, element(Idx, Record), Map),
+ Idx + 1
+ }
+ end, {#{}, 2}, record_info(fields, RecordName)))).
+
+-record(?STATE, {
+ window_size = 0 :: 0 | pos_integer(),
+ timer = fun now_msec/0,
+ size = 1 :: pos_integer(),
+ epoch = 1 :: pos_integer(),
+ minimums :: queue:queue() | undefined,
+ start_ts = undefined,
+ mean_reads = 0.0,
+ mean_writes = 0.0,
+ reads = 0,
+ writes = 0,
+ target = 4500,
+ underload_threshold = 4275, %% target * 0.95
+ overload_threshold = 4725, %% target * 1.05
+ delay_threshold = 4950, %% target * 1.10
+ multiplicative_factor = 0.7,
+ regular_delay = 100 :: timeout(),
+ congested_delay = 5000 :: timeout(),
+ initial_budget = 100,
+ latency = 0
+}).
+
+-type state() :: #?STATE{}.
+
+
+-spec new(couch_rate:id(), Opts :: map()) -> state().
+
+new(_Id, #{sensitivity := S}) when S =< 0 ->
+ error("expected SensitivityTimeWindow > 0");
+
+new(_Id, #{target := T}) when T < ?MIN_TARGET ->
+ error("the target is too small");
+
+new(_Id, #{budget := B, target := T, window := W, sensitivity := S} = Opts) ->
+ WinSize = W div S + 1,
+ validate_arguments(?map_to_record(?STATE, maps:merge(#{
+ minimums => queue:new(),
+ window_size => WinSize,
+ initial_budget => B,
+ underload_threshold => round(T * 0.95),
+ overload_threshold => round(T * 1.05),
+ delay_threshold => round(T * 1.07)
+ }, maps:without([budget, window, sensitivity], Opts)))).
+
+
+-spec from_map(couch_rate:id(), map()) -> state().
+
+from_map(_Id, Map) ->
+ ?map_to_record(?STATE, Map).
+
+
+-spec budget(couch_rate:id(), state()) ->
+ {pos_integer(), state()}.
+
+budget(Id, #?STATE{} = State) ->
+ #?STATE{
+ reads = R,
+ writes = W,
+ mean_writes = MW,
+ mean_reads = MR,
+ multiplicative_factor = MultiplicativeFactor,
+ target = Target,
+ initial_budget = InitialBudget,
+ latency = Latency
+ } = State,
+ case pattern(Id, State) of
+ optimal ->
+ {max(1, round(MR)), State};
+ failed ->
+ %% decrease budget
+ {max(1, round(R * MultiplicativeFactor)), State};
+ overloaded ->
+ %% decrease budget
+ {max(1, round(R * MultiplicativeFactor)), State};
+ underloaded ->
+ ReadWriteRatio = min(1, MR / max(1, MW)),
+ SingleWrite = Latency / W,
+ EstimatedWrites = floor(Target / SingleWrite),
+ {max(1, round(ReadWriteRatio * EstimatedWrites)), State};
+ init ->
+ {InitialBudget, State}
+ end.
+
+-spec delay(couch_rate:id(), state()) ->
+ {pos_integer(), state()}.
+
+delay(Id, #?STATE{} = State) ->
+ #?STATE{
+ regular_delay = RD,
+ congested_delay = CD
+ } = State,
+ case pattern(Id, State) of
+ failed ->
+ {CD, State};
+ _ ->
+ {RD, State}
+ end.
+
+
+-spec wait(couch_rate:id(), state()) ->
+ ok.
+
+wait(Id, State) ->
+ {Delay, _} = delay(Id, State),
+ timer:sleep(Delay).
+
+
+-spec in(couch_rate:id(), state(), Reads :: pos_integer()) ->
+ {ok, state()}.
+
+in(_Id, #?STATE{timer = TimerFun} = State, Reads) ->
+ {ok, State#?STATE{
+ reads = Reads,
+ start_ts = TimerFun()
+ }}.
+
+
+-spec success(couch_rate:id(), state(), Writes :: pos_integer()) ->
+ {ok, state()}.
+
+success(_Id, #?STATE{start_ts = undefined} = State, _Writes) ->
+ {ok, State};
+
+success(_Id, #?STATE{} = State, Writes) ->
+ #?STATE{
+ start_ts = TS,
+ timer = TimerFun,
+ reads = Reads,
+ mean_reads = MeanReads,
+ mean_writes = MeanWrites,
+ window_size = WinSize
+ } = State,
+ {ok, update_min(State#?STATE{
+ writes = Writes,
+ mean_writes = average(MeanWrites, WinSize, Writes),
+ mean_reads = average(MeanReads, WinSize, Reads),
+ latency = TimerFun() - TS
+ })}.
+
+
+-spec failure(couch_rate:id(), state()) -> {ok, state()}.
+
+failure(_Id, #?STATE{start_ts = undefined} = State) ->
+ {ok, State};
+
+failure(_Id, #?STATE{} = State) ->
+ #?STATE{
+ timer = TimerFun,
+ start_ts = TS
+ } = State,
+ {ok, update_min(State#?STATE{
+ writes = 0,
+ latency = TimerFun() - TS
+ })}.
+
+
+-spec is_congestion(couch_rate:id(), state()) -> boolean().
+
+is_congestion(Id, #?STATE{} = State) ->
+ case pattern(Id, State) of
+ overloaded -> true;
+ failed -> true;
+ _ -> false
+ end.
+
+
+-spec format(couch_rate:id(), state()) -> [{Key :: atom(), Value :: term()}].
+
+format(_Id, #?STATE{minimums = M} = State) ->
+ Map = ?record_to_map(?STATE, State),
+ Minimums = lists:map(fun({D, V}) ->
+ [{value, V}, {death, D}]
+ end, queue:to_list(M)),
+ maps:to_list(maps:merge(Map, #{
+ minimums => Minimums
+ })).
+
+
+-spec to_map(couch_rate:id(), state()) -> map().
+
+to_map(_Id, #?STATE{} = State) ->
+ ?record_to_map(?STATE, State).
+
+
+-spec update_min(state()) -> state().
+
+update_min(#?STATE{latency = ProcessingDelay} = Q0) ->
+ Q1 = remove_greater_than(Q0, ProcessingDelay),
+ Q2 = append(Q1, ProcessingDelay),
+ maybe_remove_first(Q2).
+
+
+-spec pattern(couch_rate:id(), state()) ->
+ init
+ | underloaded
+ | overloaded
+ | optimal
+ | failed.
+
+pattern(Id, #?STATE{} = State) ->
+ #?STATE{
+ underload_threshold = UnderloadThreshold,
+ overload_threshold = OverloadThreshold,
+ writes = W,
+ mean_writes = MW
+ } = State,
+ case min_latency(Id, State) of
+ MinRollingLatency when MinRollingLatency > OverloadThreshold ->
+ overloaded;
+ MinRollingLatency when MinRollingLatency > UnderloadThreshold ->
+ optimal;
+ MinRollingLatency when MinRollingLatency > 0 andalso W == 0 ->
+ failed;
+ MinRollingLatency when MinRollingLatency == 0 andalso MW == 0.0 ->
+ init;
+ _ ->
+ underloaded
+ end.
+
+
+-spec min_latency(couch_rate:id(), state()) -> pos_integer() | 0.
+
+min_latency(_Id, #?STATE{size = 1}) ->
+ 0;
+
+min_latency(_Id, #?STATE{minimums = Minimums}) ->
+ {value, {_, Min}} = head(Minimums),
+ Min.
+
+
+validate_arguments(#?STATE{timer = TimerFun})
+ when not is_function(TimerFun, 0) ->
+ error("expected `timer` to be an arity 0 function");
+
+validate_arguments(#?STATE{window_size = WinSize})
+ when WinSize < 1 ->
+ error("expected `window_size` to be greater than 1");
+
+validate_arguments(#?STATE{initial_budget = Budget})
+ when Budget < 1 ->
+ error("expected `initial_budget` to be greater than 1");
+
+validate_arguments(#?STATE{overload_threshold = OT, target = T})
+ when OT =< T ->
+ error("expected `overload_threshold` to be greater than `target`");
+
+validate_arguments(#?STATE{underload_threshold = UT, target = T})
+ when UT >= T ->
+ error("expected `underload_threshold` to be less than `target`");
+
+validate_arguments(#?STATE{delay_threshold = DT, overload_threshold = OT})
+ when DT =< OT ->
+ error("expected `delay_threshold` to be greater than `overload_threshold`");
+
+validate_arguments(#?STATE{multiplicative_factor = MF})
+ when MF < 0 orelse MF > 1 ->
+ error("expected `multiplicative_factor` to be in the (0, 1) range");
+
+validate_arguments(#?STATE{} = State) ->
+ State.
+
+
+-spec remove_greater_than(state(), pos_integer()) -> state().
+
+remove_greater_than(#?STATE{minimums = Minimums, size = S} = State, Value) ->
+ case tail(Minimums) of
+ {value, {_, T}} when Value =< T ->
+ NewState = State#?STATE{minimums = tail_drop(Minimums), size = S - 1},
+ remove_greater_than(NewState, Value);
+ {value, _} ->
+ State;
+ empty ->
+ State#?STATE{epoch = 1}
+ end.
+
+
+-spec append(state(), pos_integer()) -> state().
+
+append(#?STATE{minimums = Minimums, epoch = E, window_size = S} = State, Value) ->
+ Death = E + S,
+ State#?STATE{
+ minimums = tail_put(Minimums, {Death, Value}),
+ epoch = E + 1,
+ size = S + 1
+ }.
+
+
+-spec maybe_remove_first(state()) -> state().
+
+maybe_remove_first(#?STATE{minimums = Minimums, epoch = E, size = S} = State) ->
+ case head(Minimums) of
+ {value, {E, _V}} ->
+ State#?STATE{minimums = head_drop(Minimums), size = S - 1};
+ _ ->
+ State
+ end.
+
+
+% Donald Knuth’s Art of Computer Programming, Vol 2, page 232, 3rd
+% Welford method
+average(Avg, WindowSize, Value) ->
+ Delta = Value - Avg,
+ Avg + Delta / WindowSize.
+
+%% The helper functions are added because queue module
+%% naming conventions are weird
+head(Q) -> queue:peek_r(Q).
+
+
+head_drop(Q) -> queue:drop_r(Q).
+
+tail(Q) -> queue:peek(Q).
+
+
+tail_put(Q, V) -> queue:in_r(V, Q).
+
+
+tail_drop(Q) -> queue:drop(Q).
+
+
+-spec now_msec() -> msec().
+now_msec() ->
+ {Mega, Sec, Micro} = os:timestamp(),
+ ((Mega * 1000000) + Sec) * 1000 + Micro div 1000. \ No newline at end of file
diff --git a/src/couch_rate/src/couch_rate_pd.erl b/src/couch_rate/src/couch_rate_pd.erl
new file mode 100644
index 000000000..5d79f7890
--- /dev/null
+++ b/src/couch_rate/src/couch_rate_pd.erl
@@ -0,0 +1,90 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate_pd).
+
+-include("couch_rate.hrl").
+
+
+-export([
+ new/2,
+ create_if_missing/2,
+ lookup/2,
+ update/3
+]).
+
+-type id() :: term().
+-type state() :: term().
+-type result() :: term().
+-type store_state() :: term().
+
+-define(STATE_KEY, couch_rate_state).
+
+
+-spec create_if_missing(couch_rate:id(), state()) -> store_state().
+
+create_if_missing(Id, State) ->
+ case get({?STATE_KEY, Id}) of
+ undefined ->
+ put({?STATE_KEY, Id}, State),
+ ok;
+ _ ->
+ ok
+ end.
+
+
+-spec new(couch_rate:id(), state()) ->
+ store_state()
+ | {error, term()}.
+
+new(Id, State) ->
+ case get({?STATE_KEY, Id}) of
+ undefined ->
+ put({?STATE_KEY, Id}, State),
+ ok;
+ _ ->
+ {error, #{reason => already_exists, id => Id}}
+ end.
+
+
+-spec lookup(id(), store_state()) ->
+ state()
+ | {error, term()}.
+
+lookup(Id, _StoreState) ->
+ case get({?STATE_KEY, Id}) of
+ undefined ->
+ {error, #{reason => cannot_find, id => Id}};
+ State ->
+ State
+ end.
+
+
+-spec update(id(), store_state(), fun(
+ (id(), state()) -> {state(), result()}
+ )) ->
+ result()
+ | {error, term()}.
+
+update(Id, _StoreState, Fun) ->
+ case get({?STATE_KEY, Id}) of
+ undefined ->
+ {error, #{reason => cannot_find, id => Id}};
+ State ->
+ case Fun(Id, State) of
+ {Result, State} ->
+ put({?STATE_KEY, Id}, State),
+ Result;
+ Error ->
+ Error
+ end
+ end.
diff --git a/src/couch_rate/src/couch_rate_sup.erl b/src/couch_rate/src/couch_rate_sup.erl
new file mode 100644
index 000000000..1ce01b644
--- /dev/null
+++ b/src/couch_rate/src/couch_rate_sup.erl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rate_sup).
+-behaviour(supervisor).
+-vsn(1).
+
+-export([
+ start_link/0,
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ couch_rate_ets:create_tables(),
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 5,
+ period => 10
+ },
+ Children = [
+ ],
+ {ok, {Flags, Children}}. \ No newline at end of file
diff --git a/src/couch_rate/test/exunit/couch_rate_config_test.exs b/src/couch_rate/test/exunit/couch_rate_config_test.exs
new file mode 100644
index 000000000..7db30d272
--- /dev/null
+++ b/src/couch_rate/test/exunit/couch_rate_config_test.exs
@@ -0,0 +1,88 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+defmodule Couch.Rate.Config.Test do
+ use ExUnit.Case, async: true
+ use ExUnitProperties
+ import StreamData
+
+ @erlang_reserved_words MapSet.new([
+ "after",
+ "and",
+ "andalso",
+ "band",
+ "begin",
+ "bnot",
+ "bor",
+ "bsl",
+ "bsr",
+ "bxor",
+ "case",
+ "catch",
+ "cond",
+ "div",
+ "end",
+ "fun",
+ "if",
+ "let",
+ "not",
+ "of",
+ "or",
+ "orelse",
+ "receive",
+ "rem",
+ "try",
+ "when",
+ "xor"
+ ])
+
+ alias :couch_rate_config, as: RLC
+
+ test "parse valid configuration" do
+ parsed = RLC.from_str(~S(#{foo => 1, bar => 2.0}))
+ assert %{foo: 1, bar: 2} == parsed
+ end
+
+ property "roundtrip" do
+ check all(options <- valid_config()) do
+ parsed = RLC.from_str(RLC.to_str(options))
+ assert options == parsed
+ end
+ end
+
+ defp valid_config() do
+ map_of(
+ erlang_atom(),
+ one_of([
+ positive_integer(),
+ # we only support positive float
+ float(min: 0.0)
+ ])
+ )
+ end
+
+ defp erlang_atom() do
+ bind(string(:alphanumeric), fn str ->
+ bind(integer(?a..?z), fn char ->
+ erlang_atom(str, char)
+ end)
+ end)
+ end
+
+ defp erlang_atom(str, char) do
+ if MapSet.member?(@erlang_reserved_words, <<char, str::binary>>) do
+ String.to_atom(<<char, char, str::binary>>)
+ else
+ String.to_atom(<<char, str::binary>>)
+ end
+ end
+end
diff --git a/src/couch_rate/test/exunit/couch_rate_limiter_test.exs b/src/couch_rate/test/exunit/couch_rate_limiter_test.exs
new file mode 100644
index 000000000..ff70f793a
--- /dev/null
+++ b/src/couch_rate/test/exunit/couch_rate_limiter_test.exs
@@ -0,0 +1,350 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+defmodule Couch.Rate.Limiter.Test do
+ use ExUnit.Case, async: true
+
+ @transaction_timeout 5_000
+
+ alias :couch_rate, as: RL
+
+ describe "Stats :" do
+ @scenario %{rw_ratio: 1 / 1, target: 400, write_time: 100}
+ test "#{__ENV__.line} : #{inspect(@scenario)} (underloaded)" do
+ {rate_limiter, measurments} = simulate(@scenario, 1000)
+ stats = statistics(measurments)
+ maybe_debug(rate_limiter, measurments, stats)
+
+ assert stats.wait_time.p90 == 100,
+ "expected no artificial delays for more than 90% of batches"
+
+ budget = stats.budget
+
+ assert floor(budget.p95) in 1..7,
+ "expected budget to converge into the 1..7 range (got #{budget.p95})"
+
+ reads = stats.mean_reads
+
+ assert floor(reads.p95) in 1..7,
+ "expected mean_read to converge into the 1..7 range (got #{reads.p95})"
+
+ writes = stats.mean_writes
+ assert round(writes.p99) in 2..6
+ "expected mean_writes to converge into the 2..6 range (got #{writes.p95})"
+
+ assert stats.latency.p95 < @transaction_timeout,
+ "expected latency for 95% batches under @transaction_timout"
+
+ found_after = initial_search_speed(measurments)
+
+ assert found_after < 5,
+ "expected to find acceptable budget in less than 5 iterations (got: #{
+ found_after
+ })"
+
+ measurments
+ |> initial_search()
+ |> Enum.reduce(101, fn row, prev_budget ->
+ assert row.budget < prev_budget,
+ "expected to reduce budget while we fail"
+
+ row.budget
+ end)
+ end
+
+ @scenario %{rw_ratio: 1 / 8, target: 3900, write_time: 100}
+ test "#{__ENV__.line} : #{inspect(@scenario)} (optimal)" do
+ {rate_limiter, measurments} = simulate(@scenario, 1000)
+ stats = statistics(measurments)
+ maybe_debug(rate_limiter, measurments, stats)
+
+ assert stats.wait_time.p90 == 100,
+ "expected no artificial delays for more than 90% of batches"
+
+ budget = stats.budget
+
+ assert floor(budget.p95) in 4..7,
+ "expected budget to converge into the 4..7 range (got #{budget.p95})"
+
+ reads = stats.mean_reads
+
+ assert floor(reads.p95) in 4..7,
+ "expected mean_read to converge into the 4..7 range (got #{reads.p95})"
+
+ writes = stats.mean_writes
+ assert round(writes.p99) in 39..41
+ "expected mean_writes to converge into the 39..41 range (got #{writes.p95})"
+
+ assert stats.latency.p95 < @transaction_timeout,
+ "expected latency for 95% of batches under @transaction_timout"
+
+ found_after = initial_search_speed(measurments)
+
+ assert found_after < 10,
+ "expected to find acceptable budget in less than 10 iterations (got: #{
+ found_after
+ })"
+
+ measurments
+ |> initial_search()
+ |> Enum.reduce(101, fn row, prev_budget ->
+ assert row.budget < prev_budget,
+ "expected to reduce budget while we fail"
+
+ row.budget
+ end)
+ end
+
+ @scenario %{rw_ratio: 1 / 20, target: 3900, write_time: 100}
+ test "#{__ENV__.line} : #{inspect(@scenario)} (overloaded)" do
+ # This is a worst case scenario due to big variability of wait_time and
+ # big value read/write ratio
+ {rate_limiter, measurments} = simulate(@scenario, 1000)
+ stats = statistics(measurments)
+ maybe_debug(rate_limiter, measurments, stats)
+
+ assert stats.wait_time.p90 == 100,
+ "expected no artificial delays for more than 90% of batches"
+
+ budget = stats.budget
+ assert floor(budget.p95) in 1..4
+ "expected budget to converge into the 1..4 range (got #{budget.p95})"
+ reads = stats.mean_reads
+ assert floor(reads.p95) in 1..4
+ "expected mean_read to converge into the 1..4 range (got #{reads.p95})"
+ writes = stats.mean_writes
+ assert round(writes.p99) in 39..41
+ "expected mean_writes to converge into the 39..41 range (got #{writes.p95})"
+
+ assert stats.latency.p90 < @transaction_timeout,
+ "expected latency for 90% of batches under @transaction_timout"
+
+ found_after = initial_search_speed(measurments)
+
+ assert found_after < 16,
+ "expected to find acceptable budget in less than 16 iterations (got: #{
+ found_after
+ })"
+
+ measurments
+ |> initial_search()
+ |> Enum.reduce(101, fn row, prev_budget ->
+ assert row.budget < prev_budget,
+ "expected to reduce budget while we fail"
+
+ row.budget
+ end)
+ end
+ end
+
+ defp simulate(scenario, iterations) do
+ :couch_rate_ets.create_tables()
+
+ limiter =
+ RL.new(:limiter_id, :couch_rate_limiter, nil, %{
+ budget: 100,
+ target: scenario.target,
+ # average over 20 last measurments
+ window: scenario.write_time * 20,
+ sensitivity: scenario.write_time,
+ timer: &timer/0
+ })
+
+ result =
+ Enum.reduce(0..iterations, {limiter, []}, fn _idx, {limiter, stats} ->
+ {budget, limiter} = step(limiter, scenario.rw_ratio, scenario.write_time)
+ {limiter, update_measurments(limiter, stats, budget)}
+ end)
+
+ :couch_rate_ets.delete_tables()
+ result
+ end
+
+ defp step(limiter, read_write_ratio, write_time) do
+ {reads, limiter} = RL.budget(limiter)
+ writes = round(reads / read_write_ratio)
+ {delay, limiter} = RL.delay(limiter)
+ sleep(delay)
+ data_before = RL.to_map(limiter)
+ {:ok, limiter} = RL.in(limiter, reads)
+ data_after = RL.to_map(limiter)
+
+ assert data_after.size <= data_after.window_size + 1,
+ "The number of elements in minimums container shouldn't grow (got: #{
+ data_after.size
+ })"
+
+ if data_before.writes == 0 and
+ data_after.writes == 0 and
+ data_before.reads != 0 do
+ assert data_before.reads > data_after.reads,
+ "expected to reduce number of reads while transaction fails"
+ end
+
+ total_write_time =
+ 0..writes
+ |> Enum.reduce_while(0, fn _, acc ->
+ write_time = :rand.normal(write_time, write_time * 0.25)
+
+ if acc < @transaction_timeout do
+ {:cont, acc + write_time}
+ else
+ {:halt, acc}
+ end
+ end)
+
+ sleep(total_write_time)
+
+ if total_write_time < @transaction_timeout do
+ {:ok, limiter} = RL.success(limiter, writes)
+ {reads, limiter}
+ else
+ {:ok, limiter} = RL.failure(limiter)
+ {reads, limiter}
+ end
+ end
+
+ defp update_measurments(limiter, stats, budget) do
+ data = RL.to_map(limiter)
+ {wait_time, _} = RL.delay(limiter)
+
+ stats ++
+ [
+ %{
+ budget: budget,
+ slack: data.target - data.latency,
+ rw_ratio: data.mean_reads / max(1, data.mean_writes),
+ latency: data.latency,
+ new_budget: budget,
+ minimum_latency: RL.min_latency(limiter),
+ wait_time: wait_time,
+ elements_in_min_queue: data.size,
+ mean_reads: data.mean_reads,
+ mean_writes: data.mean_writes,
+ total_reads: data.reads,
+ total_writes: data.writes
+ }
+ ]
+ end
+
+ defp timer() do
+ now = Process.get(:time, 1)
+ Process.put(:time, now + 1)
+ now
+ end
+
+ defp sleep(sleep_time_in_ms) do
+ now = timer()
+ Process.put(:time, now + sleep_time_in_ms - 1)
+ end
+
+ defp format_table([first | _] = rows) do
+ spec =
+ first
+ |> Map.keys()
+ |> Enum.map(fn h -> {h, String.length(to_str(h))} end)
+
+ header = first |> Map.keys() |> Enum.map(&to_str/1) |> Enum.join(" , ")
+
+ lines =
+ Enum.map(rows, fn row ->
+ fields =
+ Enum.map(spec, fn {field, size} ->
+ String.pad_trailing("#{to_str(Map.get(row, field))}", size)
+ end)
+
+ Enum.join(fields, " , ")
+ end)
+
+ Enum.join([header | lines], "\n")
+ end
+
+ defp initial_search_speed(measurments) do
+ length(initial_search(measurments))
+ end
+
+ defp initial_search(measurments) do
+ Enum.reduce_while(measurments, [], fn row, acc ->
+ if row.total_writes == 0 do
+ {:cont, acc ++ [row]}
+ else
+ {:halt, acc}
+ end
+ end)
+ end
+
+ defp statistics(measurments) do
+ data =
+ Enum.reduce(measurments, %{}, fn row, acc ->
+ Enum.reduce(row, acc, fn {key, value}, acc ->
+ Map.update(acc, key, [], fn metric ->
+ metric ++ [value]
+ end)
+ end)
+ end)
+
+ Enum.reduce(data, %{}, fn {key, values}, acc ->
+ stats = Enum.into(:bear.get_statistics(values), %{})
+ {percentile, stats} = Map.pop(stats, :percentile)
+
+ stats =
+ Enum.reduce(percentile, stats, fn {key, value}, acc ->
+ Map.put(acc, String.to_atom("p#{to_str(key)}"), value)
+ end)
+
+ Map.put(acc, key, stats)
+ end)
+ end
+
+ defp format_stats(stats) do
+ rows =
+ Enum.map(stats, fn {key, values} ->
+ values
+ |> Enum.into(%{})
+ |> Map.put(:metric, key)
+ |> Map.delete(:histogram)
+ end)
+
+ format_table(rows)
+ end
+
+ defp to_str(int) when is_integer(int) do
+ "#{int}"
+ end
+
+ defp to_str(float) when is_float(float) do
+ "#{Float.to_string(Float.round(float, 2))}"
+ end
+
+ defp to_str(atom) when is_atom(atom) do
+ Atom.to_string(atom)
+ end
+
+ defp to_str(string) when is_binary(string) do
+ string
+ end
+
+ defp to_map(rate_limiter) do
+ RL.to_map(rate_limiter)
+ end
+
+ defp maybe_debug(rate_limiter, measurments, stats) do
+ if System.fetch_env("EXUNIT_DEBUG") != :error do
+ IO.puts("")
+ IO.puts("rate_limiter: #{inspect(to_map(rate_limiter))}")
+ IO.puts("measurments: #{inspect(measurments)}")
+ IO.puts("stats: #{inspect(stats)}")
+
+ IO.puts("\n" <> format_table(measurments) <> "\n" <> format_stats(stats))
+ end
+ end
+end
diff --git a/src/couch_rate/test/exunit/test_helper.exs b/src/couch_rate/test/exunit/test_helper.exs
new file mode 100644
index 000000000..9b9d6ef94
--- /dev/null
+++ b/src/couch_rate/test/exunit/test_helper.exs
@@ -0,0 +1,14 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
+ExUnit.start()
diff --git a/src/couch_views/README.md b/src/couch_views/README.md
index 49cd82b98..5647913f0 100644
--- a/src/couch_views/README.md
+++ b/src/couch_views/README.md
@@ -13,3 +13,36 @@ Code layout:
* `couch_views_fdb` - Maps view operations to FoundationDB logic.
* `couch_views_encoding` - Encodes view keys that are byte comparable following CouchDB view sort order.
* `couch_views_server` - Spawns `couch_views_indexer` workers to handle index update jobs.
+
+# Configuration
+
+## Configuring rate limiter
+
+Here is the example of configuration used in `couch_view` application:
+
+```
+[couch_rate.views]
+limiter = couch_rate_limiter
+opts = #{budget => 100, target => 2500, window => 60000, sensitivity => 1000}
+```
+
+Supported fields in `opts`:
+
+* `budget` - the initial value for estimated batch size
+* `target` - the amount in msec which we try to maintain for batch processing time
+* `window` - time interval for contention detector
+* `sensitivity` - minimal interval within the `window`
+
+Unsupported fields in `opts` (if you really know what you are doing):
+
+* `window_size` - how many batches to consider in contention detector
+* `timer` - this is used for testing to fast forward time `fun() -> current_time_in_ms() end`
+* `target` - the amount in msec which we try to maintain for batch processing time
+* `underload_threshold` - a threshold below which we would try to increase the budget
+* `overload_threshold` - a threshold above which we would start decreasing the budget
+* `delay_threshold` - a threshold above which we would start introducing delays between batches
+* `multiplicative_factor` - determines how fast we are going to decrease budget (must be in (0..1) range)
+* `regular_delay` - delay between batches when there is no overload
+* `congested_delay` - delay between batches when there is an overload
+* `initial_budget` - initial value for budget to start with
+
diff --git a/src/couch_views/src/couch_views.app.src b/src/couch_views/src/couch_views.app.src
index 0d666affd..b704c9745 100644
--- a/src/couch_views/src/couch_views.app.src
+++ b/src/couch_views/src/couch_views.app.src
@@ -27,6 +27,7 @@
couch_stats,
fabric,
couch_jobs,
- couch_eval
+ couch_eval,
+ couch_rate
]}
]}.
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index b41d0679b..0127bacec 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -72,6 +72,8 @@ init() ->
fail_job(Job, Data, sig_changed, "Design document was modified")
end,
+ Limiter = couch_rate:create_if_missing({DbName, DDocId}, "views"),
+
State = #{
tx_db => undefined,
db_uuid => DbUUID,
@@ -81,7 +83,7 @@ init() ->
job => Job,
job_data => Data,
count => 0,
- limit => num_changes(),
+ limiter => Limiter,
doc_acc => [],
design_opts => Mrst#mrst.design_opts
},
@@ -94,6 +96,7 @@ init() ->
error:database_does_not_exist ->
fail_job(Job, Data, db_deleted, "Database was deleted");
Error:Reason ->
+ couch_rate:failure(Limiter),
NewRetry = Retries + 1,
RetryLimit = retry_limit(),
@@ -152,7 +155,25 @@ add_error(Error, Reason, Data) ->
update(#{} = Db, Mrst0, State0) ->
- {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Limiter = maps:get(limiter, State0),
+ case couch_rate:budget(Limiter) of
+ 0 ->
+ couch_rate:wait(Limiter),
+ update(Db, Mrst0, State0);
+ Limit ->
+ {Mrst1, State1} = do_update(Db, Mrst0, State0#{limit => Limit, limiter => Limiter}),
+ case State1 of
+ finished ->
+ couch_eval:release_map_context(Mrst1#mrst.qserver);
+ _ ->
+ couch_rate:wait(Limiter),
+ update(Db, Mrst1, State1)
+ end
+ end.
+
+
+do_update(Db, Mrst0, State0) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
% In the first iteration of update we need
% to populate our db and view sequences
State1 = case State0 of
@@ -174,14 +195,18 @@ update(#{} = Db, Mrst0, State0) ->
#{
count := Count,
- limit := Limit,
doc_acc := DocAcc,
- last_seq := LastSeq
+ last_seq := LastSeq,
+ limit := Limit,
+ limiter := Limiter
} = State2,
-
DocAcc1 = fetch_docs(TxDb, DocAcc),
+ couch_rate:in(Limiter, Count),
+
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
- write_docs(TxDb, Mrst1, MappedDocs, State2),
+ WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2),
+
+ couch_rate:success(Limiter, WrittenDocs),
case Count < Limit of
true ->
@@ -196,14 +221,7 @@ update(#{} = Db, Mrst0, State0) ->
view_seq := LastSeq
}}
end
- end),
-
- case State4 of
- finished ->
- couch_eval:release_map_context(Mrst2#mrst.qserver);
- _ ->
- update(Db, Mrst2, State4)
- end.
+ end).
fold_changes(State) ->
@@ -304,12 +322,14 @@ write_docs(TxDb, Mrst, Docs, State) ->
KeyLimit = key_size_limit(),
ValLimit = value_size_limit(),
- lists:foreach(fun(Doc0) ->
+ DocsNumber = lists:foldl(fun(Doc0, N) ->
Doc1 = calculate_kv_sizes(Mrst, Doc0, KeyLimit, ValLimit),
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1)
- end, Docs),
+ couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc1),
+ N + 1
+ end, 0, Docs),
- couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+ couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq),
+ DocsNumber.
fetch_docs(Db, Changes) ->
@@ -472,10 +492,6 @@ fail_job(Job, Data, Error, Reason) ->
exit(normal).
-num_changes() ->
- config:get_integer("couch_views", "change_limit", 100).
-
-
retry_limit() ->
config:get_integer("couch_views", "retry_limit", 3).
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 8f8f3c5cb..43b58284d 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -50,7 +50,8 @@ indexer_test_() ->
?TDEF_FE(handle_size_value_limits),
?TDEF_FE(index_autoupdater_callback),
?TDEF_FE(handle_db_recreated_when_running),
- ?TDEF_FE(handle_db_recreated_after_finished)
+ ?TDEF_FE(handle_db_recreated_after_finished),
+ ?TDEF_FE(index_budget_is_changing)
]
}
}
@@ -375,6 +376,55 @@ index_autoupdater_callback(Db) ->
?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DbSeq)).
+index_budget_is_changing(Db) ->
+ ok = meck:new(couch_rate, [passthrough]),
+ ok = meck:expect(couch_rate, budget, fun(State) ->
+ meck:passthrough([State])
+ end),
+
+ LimiterOpts = #{
+ budget => 100,
+ sensitivity => 500,
+ target => 500,
+ timer => fun timer/0,
+ window => 2000
+ },
+
+ ok = meck:expect(couch_rate, create_if_missing, fun(Id, Module, Store, _Options) ->
+ meck:passthrough([Id, Module, Store, LimiterOpts])
+ end),
+
+ ok = meck:expect(couch_rate, wait, fun(State) ->
+ Delay = couch_rate:delay(State),
+ put(time, timer() + Delay - 1)
+ end),
+
+ DDoc = create_ddoc(),
+ Docs = lists:map(fun doc/1, lists:seq(1, 200)),
+
+ {ok, _} = fabric2_db:update_docs(Db, [DDoc | Docs], []),
+
+ {ok, _Out} = couch_views:query(
+ Db,
+ DDoc,
+ <<"map_fun2">>,
+ fun fold_fun/2,
+ [],
+ #mrargs{}
+ ),
+ ?assert(length(lists:usort(budget_history())) > 1).
+
+
+timer() ->
+ get(time) == undefined andalso put(time, 1),
+ Now = get(time),
+ put(time, Now + 1),
+ Now.
+
+
+budget_history() ->
+ [Result || {_Pid, {couch_rate, budget, _}, Result} <- meck:history(couch_rate)].
+
handle_db_recreated_when_running(Db) ->
DbName = fabric2_db:name(Db),
@@ -386,7 +436,8 @@ handle_db_recreated_when_running(Db) ->
% To intercept job building while it is running ensure updates happen one
% row at a time.
- config:set("couch_views", "change_limit", "1", false),
+ ok = meck:new(couch_rate, [passthrough]),
+ ok = meck:expect(couch_rate, budget, ['_'], meck:val(1)),
meck_intercept_job_update(self()),
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index c4f76d897..f8a5ce535 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -51,7 +51,7 @@ indexer_test_() ->
setup() ->
- test_util:start_couch([fabric, couch_js]).
+ test_util:start_couch([fabric, couch_js, couch_rate]).
cleanup(Ctx) ->