summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-06-04 17:57:41 -0400
committerNick Vatamaniuc <vatamane@apache.org>2020-06-08 01:14:52 -0400
commita71d63741ca8abaa04097b3373c1a2ec21595709 (patch)
tree52d0d28293d69ce139a19486848f0577dfcfb358
parent19ae50815ca1016719f94a2757e08757b37fb949 (diff)
downloadcouchdb-optimize-couch-views-acceptors.tar.gz
Split couch_views acceptors and workersoptimize-couch-views-acceptors
Optimize couch_views by using a separate set of acceptors and workers. Previously, all `max_workers` where spawned on startup, and were to waiting to accept jobs in parallel. In a setup with a large number of pods, and 100 workers per pod, that could lead to a lot of conflicts being generated when all those workers race to accept the same job at the same time. The improvement is to spawn only a limited number of acceptors (5, by default), then, spawn more after some of them become workers. Also, when some workers finish or die with an error, check if more acceptors could be spawned. As an example, here is what might happen with `max_acceptors = 5` and `max_workers = 100` (`A` and `W` are the current counts of acceptors and workers, respectively): 1. Starting out: `A = 5, W = 0` 2. After 2 acceptors start running: `A = 3, W = 2` Then immediately 2 more acceptors are spawned: `A = 5, W = 2` 3. After 95 workers are started: `A = 5, W = 95` 4. Now if 3 acceptors accept, it would look like: `A = 2, W = 98` But no more acceptors would be started. 5. If the last 2 acceptors also accept jobs: `A = 0, W = 100` At this point no more indexing jobs can be accepted and started until at least one of the workers finish and exit. 6. If 1 worker exits: `A = 0, W = 99` An acceptor will be immediately spawned `A = 1, W = 99` 7. If all 99 workers exit, it will go back to: `A = 5, W = 0`
-rw-r--r--rel/overlay/etc/default.ini12
-rw-r--r--src/couch_views/src/couch_views_indexer.erl3
-rw-r--r--src/couch_views/src/couch_views_server.erl96
-rw-r--r--src/couch_views/test/couch_views_server_test.erl201
-rw-r--r--src/couch_views/test/couch_views_trace_index_test.erl2
5 files changed, 292 insertions, 22 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 43e1c0ba3..40a3b3179 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -294,11 +294,17 @@ iterations = 10 ; iterations for password hashing
; Settings for view indexing
[couch_views]
-; max_workers = 100
+; Maximum acceptors waiting to accept view indexing jobs
+;max_acceptors = 5
+;
+; Maximum number of view indexing workers
+;max_workers = 100
+;
; The maximum allowed key size emitted from a view for a document (in bytes)
-; key_size_limit = 8000
+;key_size_limit = 8000
+;
; The maximum allowed value size emitted from a view for a document (in bytes)
-; value_size_limit = 64000
+;value_size_limit = 64000
; CSP (Content Security Policy) Support for _utils
[csp]
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4d09fdb6d..31868d9c0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -46,6 +46,9 @@ spawn_link() ->
init() ->
Opts = #{no_schedule => true},
{ok, Job, Data0} = couch_jobs:accept(?INDEX_JOB_TYPE, Opts),
+
+ couch_views_server:accepted(self()),
+
Data = upgrade_data(Data0),
#{
<<"db_name">> := DbName,
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index d14216e40..e45a9f315 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -20,6 +20,9 @@
start_link/0
]).
+-export([
+ accepted/1
+]).
-export([
init/1,
@@ -30,7 +33,7 @@
code_change/3
]).
-
+-define(MAX_ACCEPTORS, 5).
-define(MAX_WORKERS, 100).
@@ -38,20 +41,44 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+accepted(Worker) when is_pid(Worker) ->
+ gen_server:call(?MODULE, {accepted, Worker}, infinity).
+
+
init(_) ->
process_flag(trap_exit, true),
couch_views_jobs:set_timeout(),
St = #{
+ acceptors => #{},
workers => #{},
+ max_acceptors => max_acceptors(),
max_workers => max_workers()
},
- {ok, spawn_workers(St)}.
+ {ok, spawn_acceptors(St)}.
terminate(_, _St) ->
ok.
+handle_call({accepted, Pid}, _From, St) ->
+ #{
+ acceptors := Acceptors,
+ workers := Workers
+ } = St,
+ case maps:is_key(Pid, Acceptors) of
+ true ->
+ St1 = St#{
+ acceptors := maps:remove(Pid, Acceptors),
+ workers := Workers#{Pid => true}
+ },
+ {reply, ok, spawn_acceptors(St1)};
+ false ->
+ LogMsg = "~p : unknown acceptor processs ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid]),
+ {stop, {unknown_acceptor_pid, Pid}, St}
+ end;
+
handle_call(Msg, _From, St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.
@@ -61,19 +88,16 @@ handle_cast(Msg, St) ->
handle_info({'EXIT', Pid, Reason}, St) ->
- #{workers := Workers} = St,
- case maps:is_key(Pid, Workers) of
- true ->
- if Reason == normal -> ok; true ->
- LogMsg = "~p : indexer process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason])
- end,
- NewWorkers = maps:remove(Pid, Workers),
- {noreply, spawn_workers(St#{workers := NewWorkers})};
- false ->
- LogMsg = "~p : unknown process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
- {stop, {unknown_pid_exit, Pid}, St}
+ #{
+ acceptors := Acceptors,
+ workers := Workers
+ } = St,
+
+ % In Erlang 21+ could check map keys directly in the function head
+ case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+ {true, false} -> handle_acceptor_exit(St, Pid, Reason);
+ {false, true} -> handle_worker_exit(St, Pid, Reason);
+ {false, false} -> handle_unknown_exit(St, Pid, Reason)
end;
handle_info(Msg, St) ->
@@ -84,20 +108,54 @@ code_change(_OldVsn, St, _Extra) ->
{ok, St}.
-spawn_workers(St) ->
+% Worker process exit handlers
+
+handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) ->
+ St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
+ LogMsg = "~p : acceptor process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {noreply, spawn_acceptors(St1)}.
+
+
+handle_worker_exit(#{workers := Workers} = St, Pid, normal) ->
+ St1 = St#{workers := maps:remove(Pid, Workers)},
+ {noreply, spawn_acceptors(St1)};
+
+handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
+ St1 = St#{workers := maps:remove(Pid, Workers)},
+ LogMsg = "~p : indexer process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {noreply, spawn_acceptors(St1)}.
+
+
+handle_unknown_exit(St, Pid, Reason) ->
+ LogMsg = "~p : unknown process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid}, St}.
+
+
+spawn_acceptors(St) ->
#{
workers := Workers,
+ acceptors := Acceptors,
+ max_acceptors := MaxAcceptors,
max_workers := MaxWorkers
} = St,
- case maps:size(Workers) < MaxWorkers of
+ ACnt = maps:size(Acceptors),
+ WCnt = maps:size(Workers),
+ case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxWorkers of
true ->
Pid = couch_views_indexer:spawn_link(),
- NewSt = St#{workers := Workers#{Pid => true}},
- spawn_workers(NewSt);
+ NewSt = St#{acceptors := Acceptors#{Pid => true}},
+ spawn_acceptors(NewSt);
false ->
St
end.
+max_acceptors() ->
+ config:get_integer("couch_views", "max_acceptors", ?MAX_ACCEPTORS).
+
+
max_workers() ->
config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).
diff --git a/src/couch_views/test/couch_views_server_test.erl b/src/couch_views/test/couch_views_server_test.erl
new file mode 100644
index 000000000..17a3dc557
--- /dev/null
+++ b/src/couch_views/test/couch_views_server_test.erl
@@ -0,0 +1,201 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_server_test).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+couch_views_server_test_() ->
+ {
+ "Test couch_views_server",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ {
+ foreach,
+ fun foreach_setup/0,
+ fun foreach_teardown/1,
+ [
+ ?TDEF_FE(max_acceptors_started),
+ ?TDEF_FE(acceptors_become_workers),
+ ?TDEF_FE(handle_worker_death),
+ ?TDEF_FE(handle_acceptor_death),
+ ?TDEF_FE(handle_unknown_process_death),
+ ?TDEF_FE(max_workers_limit_works)
+ ]
+ }
+ }
+ }.
+
+
+setup() ->
+ Ctx = test_util:start_couch([
+ fabric,
+ couch_jobs,
+ couch_rate,
+ couch_js,
+ couch_eval
+ ]),
+ Ctx.
+
+
+cleanup(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+ config:set("couch_views", "max_acceptors", "2", false),
+ config:set("couch_views", "max_workers", "4", false),
+ meck:new(couch_views_server, [passthrough]),
+ meck:new(couch_views_indexer, [passthrough]),
+ meck:expect(couch_views_indexer, init, fun() ->
+ receive pls_accept -> ok end,
+ couch_views_server:accepted(self()),
+ receive pls_die -> ok end
+ end),
+ ok = application:start(couch_views).
+
+
+foreach_teardown(_) ->
+ ok = application:stop(couch_views),
+ meck:unload(),
+ config:delete("couch_views", "max_acceptors", false),
+ config:delete("couch_views", "max_workers", false),
+ ok.
+
+
+max_acceptors_started(_) ->
+ #{max_acceptors := MaxAcceptors, max_workers := MaxWorkers} = get_state(),
+ ?assertEqual(2, MaxAcceptors),
+ ?assertEqual(4, MaxWorkers),
+
+ ?assertEqual(0, maps:size(workers())),
+
+ [Pid1, Pid2] = maps:keys(acceptors()),
+ ?assert(is_pid(Pid1)),
+ ?assert(is_pid(Pid2)),
+ ?assert(is_process_alive(Pid1)),
+ ?assert(is_process_alive(Pid2)).
+
+
+acceptors_become_workers(_) ->
+ ?assertEqual(0, maps:size(workers())),
+
+ InitAcceptors = acceptors(),
+ accept_all(),
+
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(2, maps:size(workers())),
+
+ ?assertEqual(InitAcceptors, workers()).
+
+
+handle_worker_death(_) ->
+ [Pid1, Pid2] = maps:keys(acceptors()),
+ accept_all(),
+
+ % One worker exits normal
+ finish_normal([Pid1]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(1, maps:size(workers())),
+
+ % The other blows up with an error
+ finish_error([Pid2]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(0, maps:size(workers())).
+
+
+handle_acceptor_death(_) ->
+ [Pid1, Pid2] = maps:keys(acceptors()),
+ finish_error([Pid1]),
+
+ NewAcceptors = acceptors(),
+ ?assertEqual(2, maps:size(NewAcceptors)),
+ ?assert(lists:member(Pid2, maps:keys(NewAcceptors))),
+ ?assert(not lists:member(Pid1, maps:keys(NewAcceptors))).
+
+
+handle_unknown_process_death(_) ->
+ meck:reset(couch_views_server),
+ Pid = self(),
+ whereis(couch_views_server) ! {'EXIT', Pid, blah},
+ meck:wait(1, couch_views_server, terminate,
+ [{unknown_pid_exit, Pid}, '_'], 5000).
+
+
+max_workers_limit_works(_) ->
+ % Accept 2 jobs -> 2 workers
+ accept_all(),
+ ?assertEqual(2, maps:size(workers())),
+
+ % Accept 2 more jobs -> 4 workers
+ accept_all(),
+ ?assertEqual(0, maps:size(acceptors())),
+ ?assertEqual(4, maps:size(workers())),
+
+ % Kill 1 worker -> 1 acceptor and 3 workers
+ [Worker1 | _] = maps:keys(workers()),
+ finish_normal([Worker1]),
+ ?assertEqual(1, maps:size(acceptors())),
+ ?assertEqual(3, maps:size(workers())),
+
+ % Kill 2 more workers -> 2 acceptors and 1 worker
+ [Worker2, Worker3 | _] = maps:keys(workers()),
+ finish_normal([Worker2, Worker3]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(1, maps:size(workers())),
+
+ % Kill 1 last worker -> 2 acceptors and 0 workers
+ [Worker4] = maps:keys(workers()),
+ finish_normal([Worker4]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(0, maps:size(workers())).
+
+
+% Utility functions
+
+accept_all() ->
+ Acceptors = acceptors(),
+ meck:reset(couch_views_server),
+ [Pid ! pls_accept || Pid <- maps:keys(Acceptors)],
+ meck:wait(maps:size(Acceptors), couch_views_server, handle_call, 3, 5000).
+
+
+acceptors() ->
+ #{acceptors := Acceptors} = get_state(),
+ Acceptors.
+
+
+workers() ->
+ #{workers := Workers} = get_state(),
+ Workers.
+
+
+get_state() ->
+ sys:get_state(couch_views_server, infinity).
+
+
+finish_normal(Workers) when is_list(Workers) ->
+ meck:reset(couch_views_server),
+ [Pid ! pls_die || Pid <- Workers],
+ meck:wait(length(Workers), couch_views_server, handle_info,
+ [{'_', '_', normal}, '_'], 5000).
+
+
+finish_error(Workers) when is_list(Workers) ->
+ meck:reset(couch_views_server),
+ [exit(Pid, badness) || Pid <- Workers],
+ meck:wait(length(Workers), couch_views_server, handle_info,
+ [{'_', '_', badness}, '_'], 5000).
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index 5b15a4ce2..f5ea37982 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -88,6 +88,8 @@ trace_single_doc(Db) ->
meck:expect(couch_jobs, accept, 2, {ok, job, JobData}),
meck:expect(couch_jobs, update, 3, {ok, job}),
meck:expect(couch_jobs, finish, 3, ok),
+ meck:expect(couch_views_server, accepted, 1, ok),
+
put(erlfdb_trace, <<"views_write_one_doc">>),
couch_views_indexer:init(),