diff options
-rw-r--r-- | rel/overlay/etc/default.ini | 12 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 3 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_server.erl | 96 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_server_test.erl | 201 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_trace_index_test.erl | 2 |
5 files changed, 292 insertions, 22 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 43e1c0ba3..40a3b3179 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -294,11 +294,17 @@ iterations = 10 ; iterations for password hashing ; Settings for view indexing [couch_views] -; max_workers = 100 +; Maximum acceptors waiting to accept view indexing jobs +;max_acceptors = 5 +; +; Maximum number of view indexing workers +;max_workers = 100 +; ; The maximum allowed key size emitted from a view for a document (in bytes) -; key_size_limit = 8000 +;key_size_limit = 8000 +; ; The maximum allowed value size emitted from a view for a document (in bytes) -; value_size_limit = 64000 +;value_size_limit = 64000 ; CSP (Content Security Policy) Support for _utils [csp] diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 4d09fdb6d..31868d9c0 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -46,6 +46,9 @@ spawn_link() -> init() -> Opts = #{no_schedule => true}, {ok, Job, Data0} = couch_jobs:accept(?INDEX_JOB_TYPE, Opts), + + couch_views_server:accepted(self()), + Data = upgrade_data(Data0), #{ <<"db_name">> := DbName, diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl index d14216e40..e45a9f315 100644 --- a/src/couch_views/src/couch_views_server.erl +++ b/src/couch_views/src/couch_views_server.erl @@ -20,6 +20,9 @@ start_link/0 ]). +-export([ + accepted/1 +]). -export([ init/1, @@ -30,7 +33,7 @@ code_change/3 ]). - +-define(MAX_ACCEPTORS, 5). -define(MAX_WORKERS, 100). @@ -38,20 +41,44 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +accepted(Worker) when is_pid(Worker) -> + gen_server:call(?MODULE, {accepted, Worker}, infinity). + + init(_) -> process_flag(trap_exit, true), couch_views_jobs:set_timeout(), St = #{ + acceptors => #{}, workers => #{}, + max_acceptors => max_acceptors(), max_workers => max_workers() }, - {ok, spawn_workers(St)}. + {ok, spawn_acceptors(St)}. terminate(_, _St) -> ok. +handle_call({accepted, Pid}, _From, St) -> + #{ + acceptors := Acceptors, + workers := Workers + } = St, + case maps:is_key(Pid, Acceptors) of + true -> + St1 = St#{ + acceptors := maps:remove(Pid, Acceptors), + workers := Workers#{Pid => true} + }, + {reply, ok, spawn_acceptors(St1)}; + false -> + LogMsg = "~p : unknown acceptor processs ~p", + couch_log:error(LogMsg, [?MODULE, Pid]), + {stop, {unknown_acceptor_pid, Pid}, St} + end; + handle_call(Msg, _From, St) -> {stop, {bad_call, Msg}, {bad_call, Msg}, St}. @@ -61,19 +88,16 @@ handle_cast(Msg, St) -> handle_info({'EXIT', Pid, Reason}, St) -> - #{workers := Workers} = St, - case maps:is_key(Pid, Workers) of - true -> - if Reason == normal -> ok; true -> - LogMsg = "~p : indexer process ~p exited with ~p", - couch_log:error(LogMsg, [?MODULE, Pid, Reason]) - end, - NewWorkers = maps:remove(Pid, Workers), - {noreply, spawn_workers(St#{workers := NewWorkers})}; - false -> - LogMsg = "~p : unknown process ~p exited with ~p", - couch_log:error(LogMsg, [?MODULE, Pid, Reason]), - {stop, {unknown_pid_exit, Pid}, St} + #{ + acceptors := Acceptors, + workers := Workers + } = St, + + % In Erlang 21+ could check map keys directly in the function head + case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of + {true, false} -> handle_acceptor_exit(St, Pid, Reason); + {false, true} -> handle_worker_exit(St, Pid, Reason); + {false, false} -> handle_unknown_exit(St, Pid, Reason) end; handle_info(Msg, St) -> @@ -84,20 +108,54 @@ code_change(_OldVsn, St, _Extra) -> {ok, St}. -spawn_workers(St) -> +% Worker process exit handlers + +handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) -> + St1 = St#{acceptors := maps:remove(Pid, Acceptors)}, + LogMsg = "~p : acceptor process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {noreply, spawn_acceptors(St1)}. + + +handle_worker_exit(#{workers := Workers} = St, Pid, normal) -> + St1 = St#{workers := maps:remove(Pid, Workers)}, + {noreply, spawn_acceptors(St1)}; + +handle_worker_exit(#{workers := Workers} = St, Pid, Reason) -> + St1 = St#{workers := maps:remove(Pid, Workers)}, + LogMsg = "~p : indexer process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {noreply, spawn_acceptors(St1)}. + + +handle_unknown_exit(St, Pid, Reason) -> + LogMsg = "~p : unknown process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {stop, {unknown_pid_exit, Pid}, St}. + + +spawn_acceptors(St) -> #{ workers := Workers, + acceptors := Acceptors, + max_acceptors := MaxAcceptors, max_workers := MaxWorkers } = St, - case maps:size(Workers) < MaxWorkers of + ACnt = maps:size(Acceptors), + WCnt = maps:size(Workers), + case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxWorkers of true -> Pid = couch_views_indexer:spawn_link(), - NewSt = St#{workers := Workers#{Pid => true}}, - spawn_workers(NewSt); + NewSt = St#{acceptors := Acceptors#{Pid => true}}, + spawn_acceptors(NewSt); false -> St end. +max_acceptors() -> + config:get_integer("couch_views", "max_acceptors", ?MAX_ACCEPTORS). + + max_workers() -> config:get_integer("couch_views", "max_workers", ?MAX_WORKERS). diff --git a/src/couch_views/test/couch_views_server_test.erl b/src/couch_views/test/couch_views_server_test.erl new file mode 100644 index 000000000..17a3dc557 --- /dev/null +++ b/src/couch_views/test/couch_views_server_test.erl @@ -0,0 +1,201 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_server_test). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + + +couch_views_server_test_() -> + { + "Test couch_views_server", + { + setup, + fun setup/0, + fun cleanup/1, + { + foreach, + fun foreach_setup/0, + fun foreach_teardown/1, + [ + ?TDEF_FE(max_acceptors_started), + ?TDEF_FE(acceptors_become_workers), + ?TDEF_FE(handle_worker_death), + ?TDEF_FE(handle_acceptor_death), + ?TDEF_FE(handle_unknown_process_death), + ?TDEF_FE(max_workers_limit_works) + ] + } + } + }. + + +setup() -> + Ctx = test_util:start_couch([ + fabric, + couch_jobs, + couch_rate, + couch_js, + couch_eval + ]), + Ctx. + + +cleanup(Ctx) -> + test_util:stop_couch(Ctx). + + +foreach_setup() -> + config:set("couch_views", "max_acceptors", "2", false), + config:set("couch_views", "max_workers", "4", false), + meck:new(couch_views_server, [passthrough]), + meck:new(couch_views_indexer, [passthrough]), + meck:expect(couch_views_indexer, init, fun() -> + receive pls_accept -> ok end, + couch_views_server:accepted(self()), + receive pls_die -> ok end + end), + ok = application:start(couch_views). + + +foreach_teardown(_) -> + ok = application:stop(couch_views), + meck:unload(), + config:delete("couch_views", "max_acceptors", false), + config:delete("couch_views", "max_workers", false), + ok. + + +max_acceptors_started(_) -> + #{max_acceptors := MaxAcceptors, max_workers := MaxWorkers} = get_state(), + ?assertEqual(2, MaxAcceptors), + ?assertEqual(4, MaxWorkers), + + ?assertEqual(0, maps:size(workers())), + + [Pid1, Pid2] = maps:keys(acceptors()), + ?assert(is_pid(Pid1)), + ?assert(is_pid(Pid2)), + ?assert(is_process_alive(Pid1)), + ?assert(is_process_alive(Pid2)). + + +acceptors_become_workers(_) -> + ?assertEqual(0, maps:size(workers())), + + InitAcceptors = acceptors(), + accept_all(), + + ?assertEqual(2, maps:size(acceptors())), + ?assertEqual(2, maps:size(workers())), + + ?assertEqual(InitAcceptors, workers()). + + +handle_worker_death(_) -> + [Pid1, Pid2] = maps:keys(acceptors()), + accept_all(), + + % One worker exits normal + finish_normal([Pid1]), + ?assertEqual(2, maps:size(acceptors())), + ?assertEqual(1, maps:size(workers())), + + % The other blows up with an error + finish_error([Pid2]), + ?assertEqual(2, maps:size(acceptors())), + ?assertEqual(0, maps:size(workers())). + + +handle_acceptor_death(_) -> + [Pid1, Pid2] = maps:keys(acceptors()), + finish_error([Pid1]), + + NewAcceptors = acceptors(), + ?assertEqual(2, maps:size(NewAcceptors)), + ?assert(lists:member(Pid2, maps:keys(NewAcceptors))), + ?assert(not lists:member(Pid1, maps:keys(NewAcceptors))). + + +handle_unknown_process_death(_) -> + meck:reset(couch_views_server), + Pid = self(), + whereis(couch_views_server) ! {'EXIT', Pid, blah}, + meck:wait(1, couch_views_server, terminate, + [{unknown_pid_exit, Pid}, '_'], 5000). + + +max_workers_limit_works(_) -> + % Accept 2 jobs -> 2 workers + accept_all(), + ?assertEqual(2, maps:size(workers())), + + % Accept 2 more jobs -> 4 workers + accept_all(), + ?assertEqual(0, maps:size(acceptors())), + ?assertEqual(4, maps:size(workers())), + + % Kill 1 worker -> 1 acceptor and 3 workers + [Worker1 | _] = maps:keys(workers()), + finish_normal([Worker1]), + ?assertEqual(1, maps:size(acceptors())), + ?assertEqual(3, maps:size(workers())), + + % Kill 2 more workers -> 2 acceptors and 1 worker + [Worker2, Worker3 | _] = maps:keys(workers()), + finish_normal([Worker2, Worker3]), + ?assertEqual(2, maps:size(acceptors())), + ?assertEqual(1, maps:size(workers())), + + % Kill 1 last worker -> 2 acceptors and 0 workers + [Worker4] = maps:keys(workers()), + finish_normal([Worker4]), + ?assertEqual(2, maps:size(acceptors())), + ?assertEqual(0, maps:size(workers())). + + +% Utility functions + +accept_all() -> + Acceptors = acceptors(), + meck:reset(couch_views_server), + [Pid ! pls_accept || Pid <- maps:keys(Acceptors)], + meck:wait(maps:size(Acceptors), couch_views_server, handle_call, 3, 5000). + + +acceptors() -> + #{acceptors := Acceptors} = get_state(), + Acceptors. + + +workers() -> + #{workers := Workers} = get_state(), + Workers. + + +get_state() -> + sys:get_state(couch_views_server, infinity). + + +finish_normal(Workers) when is_list(Workers) -> + meck:reset(couch_views_server), + [Pid ! pls_die || Pid <- Workers], + meck:wait(length(Workers), couch_views_server, handle_info, + [{'_', '_', normal}, '_'], 5000). + + +finish_error(Workers) when is_list(Workers) -> + meck:reset(couch_views_server), + [exit(Pid, badness) || Pid <- Workers], + meck:wait(length(Workers), couch_views_server, handle_info, + [{'_', '_', badness}, '_'], 5000). diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl index 5b15a4ce2..f5ea37982 100644 --- a/src/couch_views/test/couch_views_trace_index_test.erl +++ b/src/couch_views/test/couch_views_trace_index_test.erl @@ -88,6 +88,8 @@ trace_single_doc(Db) -> meck:expect(couch_jobs, accept, 2, {ok, job, JobData}), meck:expect(couch_jobs, update, 3, {ok, job}), meck:expect(couch_jobs, finish, 3, ok), + meck:expect(couch_views_server, accepted, 1, ok), + put(erlfdb_trace, <<"views_write_one_doc">>), couch_views_indexer:init(), |