summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rel/overlay/etc/default.ini12
-rw-r--r--src/couch_views/src/couch_views_indexer.erl3
-rw-r--r--src/couch_views/src/couch_views_server.erl96
-rw-r--r--src/couch_views/test/couch_views_server_test.erl201
-rw-r--r--src/couch_views/test/couch_views_trace_index_test.erl2
5 files changed, 292 insertions, 22 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 43e1c0ba3..40a3b3179 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -294,11 +294,17 @@ iterations = 10 ; iterations for password hashing
; Settings for view indexing
[couch_views]
-; max_workers = 100
+; Maximum acceptors waiting to accept view indexing jobs
+;max_acceptors = 5
+;
+; Maximum number of view indexing workers
+;max_workers = 100
+;
; The maximum allowed key size emitted from a view for a document (in bytes)
-; key_size_limit = 8000
+;key_size_limit = 8000
+;
; The maximum allowed value size emitted from a view for a document (in bytes)
-; value_size_limit = 64000
+;value_size_limit = 64000
; CSP (Content Security Policy) Support for _utils
[csp]
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 4d09fdb6d..31868d9c0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -46,6 +46,9 @@ spawn_link() ->
init() ->
Opts = #{no_schedule => true},
{ok, Job, Data0} = couch_jobs:accept(?INDEX_JOB_TYPE, Opts),
+
+ couch_views_server:accepted(self()),
+
Data = upgrade_data(Data0),
#{
<<"db_name">> := DbName,
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
index d14216e40..e45a9f315 100644
--- a/src/couch_views/src/couch_views_server.erl
+++ b/src/couch_views/src/couch_views_server.erl
@@ -20,6 +20,9 @@
start_link/0
]).
+-export([
+ accepted/1
+]).
-export([
init/1,
@@ -30,7 +33,7 @@
code_change/3
]).
-
+-define(MAX_ACCEPTORS, 5).
-define(MAX_WORKERS, 100).
@@ -38,20 +41,44 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+accepted(Worker) when is_pid(Worker) ->
+ gen_server:call(?MODULE, {accepted, Worker}, infinity).
+
+
init(_) ->
process_flag(trap_exit, true),
couch_views_jobs:set_timeout(),
St = #{
+ acceptors => #{},
workers => #{},
+ max_acceptors => max_acceptors(),
max_workers => max_workers()
},
- {ok, spawn_workers(St)}.
+ {ok, spawn_acceptors(St)}.
terminate(_, _St) ->
ok.
+handle_call({accepted, Pid}, _From, St) ->
+ #{
+ acceptors := Acceptors,
+ workers := Workers
+ } = St,
+ case maps:is_key(Pid, Acceptors) of
+ true ->
+ St1 = St#{
+ acceptors := maps:remove(Pid, Acceptors),
+ workers := Workers#{Pid => true}
+ },
+ {reply, ok, spawn_acceptors(St1)};
+ false ->
+ LogMsg = "~p : unknown acceptor processs ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid]),
+ {stop, {unknown_acceptor_pid, Pid}, St}
+ end;
+
handle_call(Msg, _From, St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.
@@ -61,19 +88,16 @@ handle_cast(Msg, St) ->
handle_info({'EXIT', Pid, Reason}, St) ->
- #{workers := Workers} = St,
- case maps:is_key(Pid, Workers) of
- true ->
- if Reason == normal -> ok; true ->
- LogMsg = "~p : indexer process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason])
- end,
- NewWorkers = maps:remove(Pid, Workers),
- {noreply, spawn_workers(St#{workers := NewWorkers})};
- false ->
- LogMsg = "~p : unknown process ~p exited with ~p",
- couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
- {stop, {unknown_pid_exit, Pid}, St}
+ #{
+ acceptors := Acceptors,
+ workers := Workers
+ } = St,
+
+ % In Erlang 21+ could check map keys directly in the function head
+ case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+ {true, false} -> handle_acceptor_exit(St, Pid, Reason);
+ {false, true} -> handle_worker_exit(St, Pid, Reason);
+ {false, false} -> handle_unknown_exit(St, Pid, Reason)
end;
handle_info(Msg, St) ->
@@ -84,20 +108,54 @@ code_change(_OldVsn, St, _Extra) ->
{ok, St}.
-spawn_workers(St) ->
+% Worker process exit handlers
+
+handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) ->
+ St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
+ LogMsg = "~p : acceptor process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {noreply, spawn_acceptors(St1)}.
+
+
+handle_worker_exit(#{workers := Workers} = St, Pid, normal) ->
+ St1 = St#{workers := maps:remove(Pid, Workers)},
+ {noreply, spawn_acceptors(St1)};
+
+handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
+ St1 = St#{workers := maps:remove(Pid, Workers)},
+ LogMsg = "~p : indexer process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {noreply, spawn_acceptors(St1)}.
+
+
+handle_unknown_exit(St, Pid, Reason) ->
+ LogMsg = "~p : unknown process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid}, St}.
+
+
+spawn_acceptors(St) ->
#{
workers := Workers,
+ acceptors := Acceptors,
+ max_acceptors := MaxAcceptors,
max_workers := MaxWorkers
} = St,
- case maps:size(Workers) < MaxWorkers of
+ ACnt = maps:size(Acceptors),
+ WCnt = maps:size(Workers),
+ case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxWorkers of
true ->
Pid = couch_views_indexer:spawn_link(),
- NewSt = St#{workers := Workers#{Pid => true}},
- spawn_workers(NewSt);
+ NewSt = St#{acceptors := Acceptors#{Pid => true}},
+ spawn_acceptors(NewSt);
false ->
St
end.
+max_acceptors() ->
+ config:get_integer("couch_views", "max_acceptors", ?MAX_ACCEPTORS).
+
+
max_workers() ->
config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).
diff --git a/src/couch_views/test/couch_views_server_test.erl b/src/couch_views/test/couch_views_server_test.erl
new file mode 100644
index 000000000..17a3dc557
--- /dev/null
+++ b/src/couch_views/test/couch_views_server_test.erl
@@ -0,0 +1,201 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_server_test).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+couch_views_server_test_() ->
+ {
+ "Test couch_views_server",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ {
+ foreach,
+ fun foreach_setup/0,
+ fun foreach_teardown/1,
+ [
+ ?TDEF_FE(max_acceptors_started),
+ ?TDEF_FE(acceptors_become_workers),
+ ?TDEF_FE(handle_worker_death),
+ ?TDEF_FE(handle_acceptor_death),
+ ?TDEF_FE(handle_unknown_process_death),
+ ?TDEF_FE(max_workers_limit_works)
+ ]
+ }
+ }
+ }.
+
+
+setup() ->
+ Ctx = test_util:start_couch([
+ fabric,
+ couch_jobs,
+ couch_rate,
+ couch_js,
+ couch_eval
+ ]),
+ Ctx.
+
+
+cleanup(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+ config:set("couch_views", "max_acceptors", "2", false),
+ config:set("couch_views", "max_workers", "4", false),
+ meck:new(couch_views_server, [passthrough]),
+ meck:new(couch_views_indexer, [passthrough]),
+ meck:expect(couch_views_indexer, init, fun() ->
+ receive pls_accept -> ok end,
+ couch_views_server:accepted(self()),
+ receive pls_die -> ok end
+ end),
+ ok = application:start(couch_views).
+
+
+foreach_teardown(_) ->
+ ok = application:stop(couch_views),
+ meck:unload(),
+ config:delete("couch_views", "max_acceptors", false),
+ config:delete("couch_views", "max_workers", false),
+ ok.
+
+
+max_acceptors_started(_) ->
+ #{max_acceptors := MaxAcceptors, max_workers := MaxWorkers} = get_state(),
+ ?assertEqual(2, MaxAcceptors),
+ ?assertEqual(4, MaxWorkers),
+
+ ?assertEqual(0, maps:size(workers())),
+
+ [Pid1, Pid2] = maps:keys(acceptors()),
+ ?assert(is_pid(Pid1)),
+ ?assert(is_pid(Pid2)),
+ ?assert(is_process_alive(Pid1)),
+ ?assert(is_process_alive(Pid2)).
+
+
+acceptors_become_workers(_) ->
+ ?assertEqual(0, maps:size(workers())),
+
+ InitAcceptors = acceptors(),
+ accept_all(),
+
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(2, maps:size(workers())),
+
+ ?assertEqual(InitAcceptors, workers()).
+
+
+handle_worker_death(_) ->
+ [Pid1, Pid2] = maps:keys(acceptors()),
+ accept_all(),
+
+ % One worker exits normal
+ finish_normal([Pid1]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(1, maps:size(workers())),
+
+ % The other blows up with an error
+ finish_error([Pid2]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(0, maps:size(workers())).
+
+
+handle_acceptor_death(_) ->
+ [Pid1, Pid2] = maps:keys(acceptors()),
+ finish_error([Pid1]),
+
+ NewAcceptors = acceptors(),
+ ?assertEqual(2, maps:size(NewAcceptors)),
+ ?assert(lists:member(Pid2, maps:keys(NewAcceptors))),
+ ?assert(not lists:member(Pid1, maps:keys(NewAcceptors))).
+
+
+handle_unknown_process_death(_) ->
+ meck:reset(couch_views_server),
+ Pid = self(),
+ whereis(couch_views_server) ! {'EXIT', Pid, blah},
+ meck:wait(1, couch_views_server, terminate,
+ [{unknown_pid_exit, Pid}, '_'], 5000).
+
+
+max_workers_limit_works(_) ->
+ % Accept 2 jobs -> 2 workers
+ accept_all(),
+ ?assertEqual(2, maps:size(workers())),
+
+ % Accept 2 more jobs -> 4 workers
+ accept_all(),
+ ?assertEqual(0, maps:size(acceptors())),
+ ?assertEqual(4, maps:size(workers())),
+
+ % Kill 1 worker -> 1 acceptor and 3 workers
+ [Worker1 | _] = maps:keys(workers()),
+ finish_normal([Worker1]),
+ ?assertEqual(1, maps:size(acceptors())),
+ ?assertEqual(3, maps:size(workers())),
+
+ % Kill 2 more workers -> 2 acceptors and 1 worker
+ [Worker2, Worker3 | _] = maps:keys(workers()),
+ finish_normal([Worker2, Worker3]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(1, maps:size(workers())),
+
+ % Kill 1 last worker -> 2 acceptors and 0 workers
+ [Worker4] = maps:keys(workers()),
+ finish_normal([Worker4]),
+ ?assertEqual(2, maps:size(acceptors())),
+ ?assertEqual(0, maps:size(workers())).
+
+
+% Utility functions
+
+accept_all() ->
+ Acceptors = acceptors(),
+ meck:reset(couch_views_server),
+ [Pid ! pls_accept || Pid <- maps:keys(Acceptors)],
+ meck:wait(maps:size(Acceptors), couch_views_server, handle_call, 3, 5000).
+
+
+acceptors() ->
+ #{acceptors := Acceptors} = get_state(),
+ Acceptors.
+
+
+workers() ->
+ #{workers := Workers} = get_state(),
+ Workers.
+
+
+get_state() ->
+ sys:get_state(couch_views_server, infinity).
+
+
+finish_normal(Workers) when is_list(Workers) ->
+ meck:reset(couch_views_server),
+ [Pid ! pls_die || Pid <- Workers],
+ meck:wait(length(Workers), couch_views_server, handle_info,
+ [{'_', '_', normal}, '_'], 5000).
+
+
+finish_error(Workers) when is_list(Workers) ->
+ meck:reset(couch_views_server),
+ [exit(Pid, badness) || Pid <- Workers],
+ meck:wait(length(Workers), couch_views_server, handle_info,
+ [{'_', '_', badness}, '_'], 5000).
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index 5b15a4ce2..f5ea37982 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -88,6 +88,8 @@ trace_single_doc(Db) ->
meck:expect(couch_jobs, accept, 2, {ok, job, JobData}),
meck:expect(couch_jobs, update, 3, {ok, job}),
meck:expect(couch_jobs, finish, 3, ok),
+ meck:expect(couch_views_server, accepted, 1, ok),
+
put(erlfdb_trace, <<"views_write_one_doc">>),
couch_views_indexer:init(),