diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2023-02-07 02:01:55 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2023-04-15 13:37:50 -0400 |
commit | 5642c405d1c219bf19576979ba79e6de7b011d00 (patch) | |
tree | a348b88ef0c5d9e277b42ecbf7a49acd75669c69 | |
parent | 83ffba4b3668ed83f0ee570de59dbda88b7b611a (diff) | |
download | couchdb-5642c405d1c219bf19576979ba79e6de7b011d00.tar.gz |
Improve couch_proc_manager
The main improvement is speeding up process lookup. This should result in
improved latency for concurrent requests which quickly acquire and
release couchjs processes. Testing with concurrent vdu and map/reduce calls
showed a 1.6 -> 6x performance speedup [1].
Previously, couch_proc_manager linearly searched through all the processes and
executed a custom callback function for each to match design doc IDs. Instead,
use a separate ets table index for idle processes to avoid scanning assigned
processes.
Use a db tag in addition to a ddoc id to quickly find idle processes. This could
improve performance, but if that's not the case, allow configuring the tagging
scheme to use a db prefix only, or disable the scheme altogether.
Use the new `map_get` ets select guard [2] to perform ddoc id lookups during
the ets select traversal without a custom matcher callback.
In ordered ets tables use the partially bound key trick [3]. This helps skip
scanning processes using a different query language altogether.
Waiting clients used `os:timestamp/0` as a unique client identifier. It turns
out, `os:timestamp/0` is not guaranteed to be unique and could result in some
clients never getting a response. This bug was mostly likely the reason the
"fifo client order" test had to be commented out. Fix the issue by using a
newer monotonic timestamp function, and for uniqueness add the client's
gen_server return tag at the end. Uncomment the previously commented out test
so it can hopefully run again.
When clients tag a previously untagged process, asynchronously replace the
untagged process with a new process. This happens in the background and the
client doesn't have to wait for it.
When a ddoc tagged process cannot be found, before giving up, stop the oldest
unused ddoc processes to allow spawning new fresh ones. To avoid doing a linear
scan here, keep a separate `?IDLE_ACCESS` index with an ordered list of idle
ddoc proceses sorted by their last usage time.
When processes are returned to the pool, quickly respond to the client with an
early return, instead of forcing them to wait until we re-insert the process
back into the idle ets table. This should improve client latency.
If the waiting client list gets long enough, where it waits longer than the
gen_server get_proc timeout, do not waste time assigning or spawning a new
process for that client, since it already timed-out.
When gathering stats, avoid making gen_server calls, at least for the total
number of processes spawned metric. Table sizes can be easily computed with
`ets:info(Table, size)` from outside the main process.
In addition to peformance improvements clean up the couch_proc_manager API by
forcing all the calls to go through properly exported functions instead of
doing direct gen_server calls.
Remove `#proc_int{}` and use only `#proc{}`. The cast to a list/tuple between
`#proc_int{}` and `#proc{}` was dangerous and it avoided the compiler checking
that we're using the proper fields. Adding an extra field to the record
resulted in mis-matched fields being assigned.
To simplify the code a bit, keep the per-language count in an ets table. This
helps not having to thread the old and updated state everywhere. Everything
else was mostly kept in ets tables anyway, so we're staying consistent with
that general pattern.
Improve test coverage and convert the tests to use the `?TDEF_FE` macro so
there is no need for the awkward `?_test(begin ... end)` construct.
[1] https://gist.github.com/nickva/f088accc958f993235e465b9591e5fac
[2] https://www.erlang.org/doc/apps/erts/match_spec.html
[3] https://www.erlang.org/doc/man/ets.html#table-traversal
-rw-r--r-- | rel/overlay/etc/default.ini | 7 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_show.erl | 6 | ||||
-rw-r--r-- | src/couch/include/couch_db.hrl | 9 | ||||
-rw-r--r-- | src/couch/src/couch_changes.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_db.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_db_updater.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_doc.erl | 10 | ||||
-rw-r--r-- | src/couch/src/couch_native_process.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_os_process.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_proc_manager.erl | 651 | ||||
-rw-r--r-- | src/couch/src/couch_query_servers.erl | 109 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_auth_cache_tests.erl | 3 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_query_servers_tests.erl | 2 | ||||
-rw-r--r-- | src/couch/test/eunit/couchdb_os_proc_pool.erl | 715 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_show.erl | 6 | ||||
-rw-r--r-- | src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl | 2 |
16 files changed, 1034 insertions, 496 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 93aa1ca59..9c9167962 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -382,6 +382,13 @@ authentication_db = _users ;query_limit = 268435456 ;partition_query_limit = 268435456 +; Configure what to use as the db tag when selecting design doc couchjs +; processes. The choices are: +; - name (default) : Use the entire db name +; - prefix : Use only db prefix before the first "/" character +; - none : Do not use a db tag at all +;db_tag = name + [mango] ; Set to true to disable the "index all fields" text index, which can lead ; to out of memory issues when users have documents with nested array fields. diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl index c2c37c66d..9391fa4a2 100644 --- a/src/chttpd/src/chttpd_show.erl +++ b/src/chttpd/src/chttpd_show.erl @@ -79,7 +79,7 @@ handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId) -> JsonReq = chttpd_external:json_req_obj(Req, Db, DocId), JsonDoc = couch_query_servers:json_doc(Doc), [<<"resp">>, ExternalResp] = - couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName], + couch_query_servers:ddoc_prompt(Db, DDoc, [<<"shows">>, ShowName], [JsonDoc, JsonReq]), JsonResp = apply_etag(ExternalResp, CurrentEtag), chttpd_external:send_external_response(Req, JsonResp) @@ -124,7 +124,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> JsonDoc = couch_query_servers:json_doc(Doc), Cmd = [<<"updates">>, UpdateName], W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]), + UpdateResp = couch_query_servers:ddoc_prompt(Db, DDoc, Cmd, [JsonDoc, JsonReq]), JsonResp = case UpdateResp of [<<"up">>, {NewJsonDoc}, {JsonResp0}] -> case chttpd:header_value(Req, "X-Couch-Full-Commit", "false") of @@ -204,7 +204,7 @@ handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> CB = fun list_cb/2, QueryArgs = couch_mrview_http:parse_body_and_query(Req, Keys), Options = [{user_ctx, Req#httpd.user_ctx}], - couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) -> + couch_query_servers:with_ddoc_proc(Db, DDoc, fun(QServer) -> Acc = #lacc{ lname = LName, req = Req, diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 019c205ab..a15c65039 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -189,11 +189,14 @@ -record(proc, { pid, lang, - client = nil, - ddoc_keys = [], + client, + db_key, + ddoc_keys = #{}, prompt_fun, set_timeout_fun, - stop_fun + stop_fun, + threshold_ts, + last_use_ts }). -record(leaf, { diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index 2078fed3a..2f0c545a6 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -233,7 +233,7 @@ filter(_Db, DocInfo, {design_docs, Style}) -> end; filter(Db, DocInfo, {view, Style, DDoc, VName}) -> Docs = open_revs(Db, DocInfo, Style), - {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), + {ok, Passes} = couch_query_servers:filter_view(Db, DDoc, VName, Docs), filter_revs(Passes, Docs); filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> Req = case Req0 of diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index fdcf23e1b..333ed3683 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -950,7 +950,7 @@ load_validation_funs(#db{main_pid=Pid}=Db) -> end, DDocs = lists:map(OpenDocs, DDocInfos), Funs = lists:flatmap(fun(DDoc) -> - case couch_doc:get_validate_doc_fun(DDoc) of + case couch_doc:get_validate_doc_fun(Db, DDoc) of nil -> []; Fun -> [Fun] end diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 535acfad6..169fbc14d 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -330,7 +330,7 @@ refresh_validate_doc_funs(Db0) -> fun(DesignDocInfo) -> {ok, DesignDoc} = couch_db:open_doc_int( Db, DesignDocInfo, [ejson_body]), - case couch_doc:get_validate_doc_fun(DesignDoc) of + case couch_doc:get_validate_doc_fun(Db, DesignDoc) of nil -> []; Fun -> [Fun] end diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl index ec16d21db..e9265e7a0 100644 --- a/src/couch/src/couch_doc.erl +++ b/src/couch/src/couch_doc.erl @@ -16,7 +16,7 @@ -export([from_json_obj/1, from_json_obj_validate/1]). -export([from_json_obj/2, from_json_obj_validate/2]). -export([to_json_obj/2, has_stubs/1, merge_stubs/2]). --export([validate_docid/1, validate_docid/2, get_validate_doc_fun/1]). +-export([validate_docid/1, validate_docid/2, get_validate_doc_fun/2]). -export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]). -export([doc_from_multi_part_stream/4]). -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). @@ -400,15 +400,15 @@ is_deleted(Tree) -> end. -get_validate_doc_fun({Props}) -> - get_validate_doc_fun(couch_doc:from_json_obj({Props})); -get_validate_doc_fun(#doc{body={Props}}=DDoc) -> +get_validate_doc_fun(Db, {Props}) -> + get_validate_doc_fun(Db, couch_doc:from_json_obj({Props})); +get_validate_doc_fun(Db, #doc{body={Props}}=DDoc) -> case couch_util:get_value(<<"validate_doc_update">>, Props) of undefined -> nil; _Else -> fun(EditDoc, DiskDoc, Ctx, SecObj) -> - couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) + couch_query_servers:validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) end end. diff --git a/src/couch/src/couch_native_process.erl b/src/couch/src/couch_native_process.erl index eee8b2860..20378bf84 100644 --- a/src/couch/src/couch_native_process.erl +++ b/src/couch/src/couch_native_process.erl @@ -115,7 +115,7 @@ handle_cast(_Msg, State) -> {noreply, State, State#evstate.idle}. handle_info(timeout, State) -> - gen_server:cast(couch_proc_manager, {os_proc_idle, self()}), + couch_proc_manager:os_proc_idle(self()), erlang:garbage_collect(), {noreply, State, State#evstate.idle}; handle_info({'EXIT',_,normal}, State) -> diff --git a/src/couch/src/couch_os_process.erl b/src/couch/src/couch_os_process.erl index 63a241433..675f10538 100644 --- a/src/couch/src/couch_os_process.erl +++ b/src/couch/src/couch_os_process.erl @@ -229,7 +229,7 @@ handle_cast(Msg, #os_proc{idle=Idle}=OsProc) -> {noreply, OsProc, Idle}. handle_info(timeout, #os_proc{idle=Idle}=OsProc) -> - gen_server:cast(couch_proc_manager, {os_proc_idle, self()}), + couch_proc_manager:os_proc_idle(self()), erlang:garbage_collect(), {noreply, OsProc, Idle}; handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) -> diff --git a/src/couch/src/couch_proc_manager.erl b/src/couch/src/couch_proc_manager.erl index e7a25a6d2..42c8958f9 100644 --- a/src/couch/src/couch_proc_manager.erl +++ b/src/couch/src/couch_proc_manager.erl @@ -17,6 +17,10 @@ -export([ start_link/0, + get_proc/3, + get_proc/1, + ret_proc/1, + os_proc_idle/1, get_proc_count/0, get_stale_proc_count/0, new_proc/1, @@ -45,11 +49,13 @@ -define(WAITERS, couch_proc_manager_waiters). -define(OPENING, couch_proc_manager_opening). -define(SERVERS, couch_proc_manager_servers). +-define(COUNTERS, couch_proc_manager_counters). +-define(IDLE_BY_DB, couch_proc_manager_idle_by_db). +-define(IDLE_ACCESS, couch_proc_manager_idle_access). -define(RELISTEN_DELAY, 5000). -record(state, { config, - counts, threshold_ts, hard_limit, soft_limit @@ -59,31 +65,44 @@ -type revision() :: {integer(), binary()}. -record(client, { - timestamp :: os:timestamp() | '_', - from :: undefined | {pid(), reference()} | '_', + wait_key :: {binary(), integer(), gen_server:reply_tag()} | '_', + from :: undefined | {pid(), gen_server:reply_tag()} | '_', lang :: binary() | '_', - ddoc :: #doc{} | '_', + ddoc :: undefined | #doc{} | '_', + db_key :: undefined | binary(), ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_' }). --record(proc_int, { - pid, - lang, - client, - ddoc_keys = [], - prompt_fun, - set_timeout_fun, - stop_fun, - t0 = os:timestamp() -}). - start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +get_proc(#doc{body = {Props}} = DDoc, DbKey, {_DDocId, _Rev} = DDocKey) -> + LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + Lang = couch_util:to_binary(LangStr), + Client = #client{lang = Lang, ddoc = DDoc, db_key = DbKey, ddoc_key = DDocKey}, + Timeout = get_os_process_timeout(), + gen_server:call(?MODULE, {get_proc, Client}, Timeout). + +get_proc(LangStr) -> + Lang = couch_util:to_binary(LangStr), + Client = #client{lang = Lang}, + Timeout = get_os_process_timeout(), + gen_server:call(?MODULE, {get_proc, Client}, Timeout). + +ret_proc(#proc{} = Proc) -> + gen_server:call(?MODULE, {ret_proc, Proc}, infinity). + +os_proc_idle(Proc) when is_pid(Proc) -> + gen_server:cast(?MODULE, {os_proc_idle, Proc}). get_proc_count() -> - gen_server:call(?MODULE, get_proc_count). + try + ets:info(?PROCS, size) + ets:info(?OPENING, size) + catch + error:badarg -> + 0 + end. get_stale_proc_count() -> @@ -102,11 +121,27 @@ init([]) -> process_flag(trap_exit, true), ok = config:listen_for_changes(?MODULE, undefined), - TableOpts = [public, named_table, ordered_set], - ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]), - ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]), - ets:new(?OPENING, [public, named_table, set]), - ets:new(?SERVERS, [public, named_table, set]), + % Main process table. Pid -> #proc{} + ets:new(?PROCS, [named_table, {read_concurrency, true}, {keypos, #proc.pid}]), + + % #client{} waiters ordered by {Lang, timestamp(), Ref} + ets:new(?WAITERS, [named_table, ordered_set, {keypos, #client.wait_key}]), + + % Async process openers. Pid -> #client{} + ets:new(?OPENING, [named_table]), + + % Configured language servers Lang -> Start MFA | Command + ets:new(?SERVERS, [named_table]), + + % Idle Pids. Ordered to allow partial key lookups {Lang, DbKey, Pid} -> DDocs + ets:new(?IDLE_BY_DB, [named_table, ordered_set]), + + % Idle Db tagged pids ordered by last use. {Lang, timestamp(), Pid} -> true + ets:new(?IDLE_ACCESS, [named_table, ordered_set]), + + % Lang -> number of procs spawn for that lang + ets:new(?COUNTERS, [named_table]), + ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")), ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")), ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]), @@ -114,99 +149,82 @@ init([]) -> {ok, #state{ config = get_proc_config(), - counts = dict:new(), - threshold_ts = os:timestamp(), + threshold_ts = timestamp(), hard_limit = get_hard_limit(), soft_limit = get_soft_limit() }}. terminate(_Reason, _State) -> - ets:foldl(fun(#proc_int{pid=P}, _) -> - couch_util:shutdown_sync(P) - end, 0, ?PROCS), - ok. + foreach_proc(fun(#proc{pid = P}) -> couch_util:shutdown_sync(P) end). -handle_call(get_proc_count, _From, State) -> - NumProcs = ets:info(?PROCS, size), - NumOpening = ets:info(?OPENING, size), - {reply, NumProcs + NumOpening, State}; handle_call(get_stale_proc_count, _From, State) -> #state{threshold_ts = T0} = State, - MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}], + MatchSpec = [{#proc{threshold_ts = '$1', _ = '_'}, [{'<', '$1', T0}], [true]}], {reply, ets:select_count(?PROCS, MatchSpec), State}; +handle_call({get_proc, #client{} = Client}, From, State) -> + add_waiting_client(Client#client{from = From}), + ok = flush_waiters(State, Client#client.lang), + {noreply, State}; +handle_call({ret_proc, #proc{} = Proc}, From, State) -> + #proc{client = Ref, pid = Pid} = Proc, -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> - LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>), - Lang = couch_util:to_binary(LangStr), - Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey}, - add_waiting_client(Client), - {noreply, flush_waiters(State, Lang)}; - -handle_call({get_proc, LangStr}, From, State) -> - Lang = couch_util:to_binary(LangStr), - Client = #client{from=From, lang=Lang}, - add_waiting_client(Client), - {noreply, flush_waiters(State, Lang)}; -handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) -> erlang:demonitor(Ref, [flush]), - NewState = case ets:lookup(?PROCS, Proc#proc.pid) of - [#proc_int{}=ProcInt] -> - return_proc(State, ProcInt); + gen_server:reply(From, true), + case ets:lookup(?PROCS, Pid) of + [#proc{} = ProcInt] -> + ok = return_proc(State, ProcInt); [] -> % Proc must've died and we already % cleared it out of the table in % the handle_info clause. - State + ok end, - {reply, true, NewState}; + {noreply, State}; handle_call(set_threshold_ts, _From, State) -> - FoldFun = fun - (#proc_int{client = undefined} = Proc, StateAcc) -> - remove_proc(StateAcc, Proc); - (_, StateAcc) -> - StateAcc + Fun = fun + (#proc{client = undefined} = Proc) -> ok = remove_proc(Proc); + (_) -> ok end, - NewState = ets:foldl(FoldFun, State, ?PROCS), - {reply, ok, NewState#state{threshold_ts = os:timestamp()}}; + ok = foreach_proc(Fun), + {reply, ok, State#state{threshold_ts = timestamp()}}; handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) -> - FoldFun = fun - (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) -> + Fun = fun + (#proc{client = undefined, threshold_ts = Ts2} = Proc) -> case Ts1 > Ts2 of - true -> - remove_proc(StateAcc, Proc); - false -> - StateAcc + true -> ok = remove_proc(Proc); + false -> ok end; - (_, StateAcc) -> - StateAcc + (_) -> + ok end, - NewState = ets:foldl(FoldFun, State, ?PROCS), - {reply, ok, NewState}; + foreach_proc(Fun), + {reply, ok, State}; handle_call(_Call, _From, State) -> {reply, ignored, State}. - -handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) -> - NewState = case ets:lookup(?PROCS, Pid) of - [#proc_int{client=undefined, lang=Lang}=Proc] -> - case dict:find(Lang, Counts) of - {ok, Count} when Count >= State#state.soft_limit -> - couch_log:info("Closing idle OS Process: ~p", [Pid]), - remove_proc(State, Proc); - {ok, _} -> - State +handle_cast({os_proc_idle, Pid}, #state{soft_limit = SoftLimit} = State) -> + case ets:lookup(?PROCS, Pid) of + [#proc{client = undefined, db_key = DbKey, lang = Lang} = Proc] -> + IsOverSoftLimit = get_count(Lang) >= SoftLimit, + IsTagged = DbKey =/= undefined, + case IsOverSoftLimit orelse IsTagged of + true -> + couch_log:debug("Closing tagged or idle OS Process: ~p", [Pid]), + ok = remove_proc(Proc); + false -> + ok end; _ -> State end, - {noreply, NewState}; + {noreply, State}; handle_cast(reload_config, State) -> NewState = State#state{ @@ -215,47 +233,59 @@ handle_cast(reload_config, State) -> soft_limit = get_soft_limit() }, maybe_configure_erlang_native_servers(), - {noreply, flush_waiters(NewState)}; - + lists:foreach( + fun({Lang, _}) -> + ok = flush_waiters(NewState, Lang) + end, + ets:tab2list(?COUNTERS) + ), + {noreply, NewState}; handle_cast(_Msg, State) -> {noreply, State}. handle_info(shutdown, State) -> {stop, shutdown, State}; - +handle_info({'EXIT', Pid, {spawn_ok, Proc0, undefined = _From}}, State) -> + % Use ets:take/2 to assert that opener existed before removing. Also assert that + % the pid matches and the the client was a bogus client + [{Pid, #client{from = undefined}}] = ets:take(?OPENING, Pid), + Proc = Proc0#proc{client = undefined}, + link(Proc#proc.pid), + ets:insert(?PROCS, Proc), + insert_idle_by_db(Proc), + {noreply, State}; handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) -> - ets:delete(?OPENING, Pid), - link(Proc0#proc_int.pid), + % Use ets:take/2 to assert that opener existed before removing + [{Pid, #client{}}] = ets:take(?OPENING, Pid), + link(Proc0#proc.pid), Proc = assign_proc(ClientPid, Proc0), gen_server:reply(From, {ok, Proc, State#state.config}), {noreply, State}; handle_info({'EXIT', Pid, spawn_error}, State) -> - [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid), - ets:delete(?OPENING, Pid), - NewState = State#state{ - counts = dict:update_counter(Lang, -1, State#state.counts) - }, - {noreply, flush_waiters(NewState, Lang)}; + % Assert when removing that we always expect the opener to have been there + [{Pid, #client{lang = Lang}}] = ets:take(?OPENING, Pid), + dec_count(Lang), + ok = flush_waiters(State, Lang), + {noreply, State}; handle_info({'EXIT', Pid, Reason}, State) -> couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]), case ets:lookup(?PROCS, Pid) of - [#proc_int{} = Proc] -> - NewState = remove_proc(State, Proc), - {noreply, flush_waiters(NewState, Proc#proc_int.lang)}; - [] -> - {noreply, State} - end; - -handle_info({'DOWN', Ref, _, _, _Reason}, State0) -> - case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of - [#proc_int{} = Proc] -> - {noreply, return_proc(State0, Proc)}; + [#proc{} = Proc] -> + ok = remove_proc(Proc), + ok = flush_waiters(State, Proc#proc.lang); [] -> - {noreply, State0} - end; + ok + end, + {noreply, State}; +handle_info({'DOWN', Ref, _, _, _Reason}, #state{} = State) -> + case ets:match_object(?PROCS, #proc{client = Ref, _ = '_'}) of + [#proc{} = Proc] -> ok = return_proc(State, Proc); + [] -> ok + end, + {noreply, State}; handle_info(restart_config_listener, State) -> @@ -284,73 +314,97 @@ handle_config_change("query_server_config", _, _, _, _) -> handle_config_change(_, _, _, _, _) -> {ok, undefined}. - -find_proc(#client{lang = Lang, ddoc_key = undefined}) -> - Pred = fun(_) -> - true - end, - find_proc(Lang, Pred); -find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) -> - Pred = fun(#proc_int{ddoc_keys = DDocKeys}) -> - lists:member(DDocKey, DDocKeys) - end, - case find_proc(Lang, Pred) of +find_proc(#client{ddoc_key = undefined} = Client, _CanSpawn) -> + #client{lang = Lang} = Client, + % Find an unowned process first, if that fails find an owned one + case find_proc(Lang, undefined, '_') of + {ok, Proc} -> + {ok, Proc}; not_found -> - case find_proc(Client#client{ddoc_key=undefined}) of + case find_proc(Lang, '_', '_') of {ok, Proc} -> - teach_ddoc(DDoc, DDocKey, Proc); + {ok, Proc}; Else -> Else end; Else -> Else + end; +find_proc(#client{} = Client, CanSpawn) -> + #client{ + lang = Lang, + ddoc = DDoc, + db_key = DbKey, + ddoc_key = DDocKey + } = Client, + case find_proc(Lang, DbKey, DDocKey) of + not_found -> + % Find a ddoc process used by the same db at least + case find_proc(Lang, DbKey, '_') of + {ok, Proc} -> + teach_ddoc(DDoc, DbKey, DDocKey, Proc); + not_found -> + % Pick a process not used by any ddoc + case find_proc(Lang, undefined, '_') of + {ok, Proc} -> + replenish_untagged_pool(Lang, CanSpawn), + teach_ddoc(DDoc, DbKey, DDocKey, Proc); + Else -> + Else + end; + Else -> + Else + end; + {ok, Proc} -> + {ok, Proc}; + Else -> + Else end. -find_proc(Lang, Fun) -> - try iter_procs(Lang, Fun) - catch ?STACKTRACE(error, Reason, StackTrace) - couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]), - {error, Reason} - end. - - -iter_procs(Lang, Fun) when is_binary(Lang) -> - Pattern = #proc_int{lang=Lang, client=undefined, _='_'}, - MSpec = [{Pattern, [], ['$_']}], - case ets:select_reverse(?PROCS, MSpec, 25) of - '$end_of_table' -> - not_found; - Continuation -> - iter_procs_int(Continuation, Fun) - end. - +find_proc(Lang, DbPat, DDocKey) when + DbPat =:= '_' orelse DbPat =:= undefined orelse is_binary(DbPat), + DDocKey =:= '_' orelse is_tuple(DDocKey) +-> + Pattern = {{Lang, DbPat, '$1'}, '$2'}, + Guards = + case DDocKey of + '_' -> []; + {_, _} -> [{map_get, {const, DDocKey}, '$2'}] + end, + MSpec = [{Pattern, Guards, ['$1']}], + case ets:select_reverse(?IDLE_BY_DB, MSpec, 1) of -iter_procs_int({[], Continuation0}, Fun) -> - case ets:select_reverse(Continuation0) of '$end_of_table' -> not_found; - Continuation1 -> - iter_procs_int(Continuation1, Fun) - end; -iter_procs_int({[Proc | Rest], Continuation}, Fun) -> - case Fun(Proc) of - true -> - {ok, Proc}; - false -> - iter_procs_int({Rest, Continuation}, Fun) + {[Pid], _Continuation} when is_pid(Pid) -> + [#proc{client = undefined} = Proc] = ets:lookup(?PROCS, Pid), + % Once it's found it's not idle any longer and it might be + % "tought" a new ddoc, so its db_key might change + remove_idle_by_db(Proc), + remove_idle_access(Proc), + {ok, Proc} end. - -spawn_proc(State, Client) -> +spawn_proc(#client{} = Client) -> Pid = spawn_link(?MODULE, new_proc, [Client]), ets:insert(?OPENING, {Pid, Client}), - Counts = State#state.counts, - Lang = Client#client.lang, - State#state{ - counts = dict:update_counter(Lang, 1, Counts) - }. - + inc_count(Client#client.lang). +% This instance was spawned without a client to replenish +% the untagged pool asynchronously +new_proc(#client{from = undefined} = Client) -> + #client{lang = Lang} = Client, + Resp = try + case new_proc_int(undefined, Lang) of + {ok, Proc} -> + {spawn_ok, Proc, undefined}; + _Error -> + spawn_error + end + catch _:_ -> + spawn_error + end, + exit(Resp); new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) -> #client{from=From, lang=Lang} = Client, Resp = try @@ -365,13 +419,17 @@ new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) -> spawn_error end, exit(Resp); - -new_proc(Client) -> - #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client, - Resp = try - case new_proc_int(From, Lang) of +new_proc(#client{} = Client) -> + #client{ + from = From, + lang = Lang, + ddoc = DDoc, + db_key = DbKey, + ddoc_key = DDocKey + } = Client, + Resp = try case new_proc_int(From, Lang) of {ok, NewProc} -> - {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc), + {ok, Proc} = teach_ddoc(DDoc, DbKey, DDocKey, NewProc), {spawn_ok, Proc, From}; Error -> gen_server:reply(From, {error, Error}), @@ -382,6 +440,64 @@ new_proc(Client) -> end, exit(Resp). +replenish_untagged_pool(Lang, _CanSpawn = true) -> + % After an untagged instance is tagged, we try to replenish + % the untagged pool asynchronously. Here we are using a "bogus" + % #client{} with an undefined from field. + ok = spawn_proc(#client{lang = Lang, from = undefined}); +replenish_untagged_pool(_Lang, _CanSpawn = false) -> + ok. + +reap_idle(Num, <<_/binary>> = Lang) when is_integer(Num), Num >= 1 -> + case ets:match_object(?IDLE_ACCESS, {{Lang, '_', '_'}, '_'}, Num) of + '$end_of_table' -> + 0; + {Objects = [_ | _], _} -> + ok = reap_idle(Objects), + length(Objects) + end. + +reap_idle([]) -> + ok; +reap_idle([{{_Lang, _Ts, Pid}, true} | Rest]) -> + case ets:lookup(?PROCS, Pid) of + % Do an extra assert that client is undefined + [#proc{client = undefined} = Proc] -> + ok = remove_proc(Proc); + [] -> + ok + end, + reap_idle(Rest). + +insert_idle_access(#proc{db_key = undefined}, _Ts) -> + % Only tagged proc are index in ?IDLE_ACCESS + ok; +insert_idle_access(#proc{db_key = <<_/binary>>} = Proc, Ts) -> + #proc{lang = Lang, pid = Pid} = Proc, + % Lang is used for partially bound key access + % Pid is for uniqueness as time is not strictly monotonic + true = ets:insert_new(?IDLE_ACCESS, {{Lang, Ts, Pid}, true}), + ok. + +remove_idle_access(#proc{db_key = undefined}) -> + % Only tagged procs are indexed in ?IDLE_ACCESS + ok; +remove_idle_access(#proc{db_key = <<_/binary>>} = Proc) -> + #proc{last_use_ts = Ts, lang = Lang, pid = Pid} = Proc, + true = ets:delete(?IDLE_ACCESS, {Lang, Ts, Pid}), + ok. + +insert_idle_by_db(#proc{} = Proc) -> + #proc{lang = Lang, pid = Pid, db_key = Db, ddoc_keys = #{} = DDocs} = Proc, + % An extra assert that only expect to insert a new object + true = ets:insert_new(?IDLE_BY_DB, {{Lang, Db, Pid}, DDocs}), + ok. + +remove_idle_by_db(#proc{} = Proc) -> + #proc{lang = Lang, pid = Pid, db_key = Db} = Proc, + true = ets:delete(?IDLE_BY_DB, {Lang, Db, Pid}), + ok. + split_string_if_longer(String, Pos) -> case length(String) > Pos of true -> lists:split(Pos, String); @@ -435,7 +551,10 @@ new_proc_int(From, Lang) when is_binary(Lang) -> LangStr = binary_to_list(Lang), case get_query_server(LangStr) of undefined -> - gen_server:reply(From, {unknown_query_language, Lang}); + case From of + undefined -> ok; + {_, _} -> gen_server:reply(From, {unknown_query_language, Lang}) + end; {M, F, A} -> {ok, Pid} = apply(M, F, A), make_proc(Pid, Lang, M); @@ -444,102 +563,98 @@ new_proc_int(From, Lang) when is_binary(Lang) -> make_proc(Pid, Lang, couch_os_process) end. - -teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) -> +teach_ddoc(DDoc, DbKey, {DDocId, _Rev} = DDocKey, #proc{ddoc_keys = #{} = Keys} = Proc) -> % send ddoc over the wire % we only share the rev with the client we know to update code % but it only keeps the latest copy, per each ddoc, around. - true = couch_query_servers:proc_prompt( - export_proc(Proc), - [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), + JsonDoc = couch_doc:to_json_obj(DDoc, []), + Prompt = [<<"ddoc">>, <<"new">>, DDocId, JsonDoc], + true = couch_query_servers:proc_prompt(Proc, Prompt), % we should remove any other ddocs keys for this docid % because the query server overwrites without the rev - Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + Keys2 = maps:filter(fun({Id, _}, true) -> Id =/= DDocId end, Keys), + % add ddoc to the proc - {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}. + {ok, Proc#proc{db_key = DbKey, ddoc_keys = Keys2#{DDocKey => true}}}. make_proc(Pid, Lang, Mod) when is_binary(Lang) -> - Proc = #proc_int{ + Proc = #proc{ lang = Lang, pid = Pid, prompt_fun = {Mod, prompt}, set_timeout_fun = {Mod, set_timeout}, - stop_fun = {Mod, stop} + stop_fun = {Mod, stop}, + threshold_ts = timestamp(), + last_use_ts = timestamp() }, unlink(Pid), {ok, Proc}. - -assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) -> - Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)}, +assign_proc(Pid, #proc{client = undefined} = Proc0) when is_pid(Pid) -> + Proc = Proc0#proc{client = erlang:monitor(process, Pid)}, + % It's important to insert the proc here instead of doing an update_element + % as we might have updated the db_key or ddoc_keys in teach_ddoc/4 ets:insert(?PROCS, Proc), - export_proc(Proc); -assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) -> + Proc; +assign_proc(#client{} = Client, #proc{client = undefined} = Proc) -> {Pid, _} = Client#client.from, assign_proc(Pid, Proc). - -return_proc(#state{} = State, #proc_int{} = ProcInt) -> - #proc_int{pid = Pid, lang = Lang} = ProcInt, - NewState = case is_process_alive(Pid) of true -> - case ProcInt#proc_int.t0 < State#state.threshold_ts of - true -> - remove_proc(State, ProcInt); - false -> - gen_server:cast(Pid, garbage_collect), - true = ets:update_element(?PROCS, Pid, [ - {#proc_int.client, undefined} - ]), - State - end; - false -> - remove_proc(State, ProcInt) +return_proc(#state{} = State, #proc{} = Proc) -> + #proc{pid = Pid, lang = Lang} = Proc, + case is_process_alive(Pid) of + true -> + case Proc#proc.threshold_ts < State#state.threshold_ts of + true -> + ok = remove_proc(Proc); + false -> + gen_server:cast(Pid, garbage_collect), + Ts = timestamp(), + true = ets:update_element(?PROCS, Pid, [ + {#proc.client, undefined}, + {#proc.last_use_ts, Ts} + ]), + Proc1 = Proc#proc{client = undefined, last_use_ts = Ts}, + insert_idle_access(Proc1, Ts), + insert_idle_by_db(Proc1) + end; + false -> + ok = remove_proc(Proc) end, - flush_waiters(NewState, Lang). - - -remove_proc(State, #proc_int{}=Proc) -> - ets:delete(?PROCS, Proc#proc_int.pid), - case is_process_alive(Proc#proc_int.pid) of true -> - unlink(Proc#proc_int.pid), - gen_server:cast(Proc#proc_int.pid, stop); - false -> - ok + ok = flush_waiters(State, Lang). + +remove_proc(#proc{pid = Pid} = Proc) -> + remove_idle_access(Proc), + remove_idle_by_db(Proc), + ets:delete(?PROCS, Pid), + case is_process_alive(Pid) of + true-> + unlink(Pid), + gen_server:cast(Pid, stop); + false -> + ok end, - Counts = State#state.counts, - Lang = Proc#proc_int.lang, - State#state{ - counts = dict:update_counter(Lang, -1, Counts) - }. - - --spec export_proc(#proc_int{}) -> #proc{}. -export_proc(#proc_int{} = ProcInt) -> - ProcIntList = tuple_to_list(ProcInt), - ProcLen = record_info(size, proc), - [_ | Data] = lists:sublist(ProcIntList, ProcLen), - list_to_tuple([proc | Data]). - - -flush_waiters(State) -> - dict:fold(fun(Lang, Count, StateAcc) -> - case Count < State#state.hard_limit of - true -> - flush_waiters(StateAcc, Lang); - false -> - StateAcc - end - end, State, State#state.counts). + dec_count(Proc#proc.lang). -flush_waiters(State, Lang) -> - CanSpawn = can_spawn(State, Lang), + +flush_waiters(#state{} = State, Lang) -> + #state{hard_limit = HardLimit, config = {[_ | _] = Cfg}} = State, + TimeoutMSec = couch_util:get_value(<<"timeout">>, Cfg), + Timeout = erlang:convert_time_unit(TimeoutMSec, millisecond, native), + StaleLimit = timestamp() - Timeout, case get_waiting_client(Lang) of + #client{wait_key = {_, T, _}} = Client when is_integer(T), T < StaleLimit -> + % Client waited too long and the gen_server call timeout + % likey fired already, don't bother allocating a process for it + remove_waiting_client(Client), + flush_waiters(State, Lang); #client{from = From} = Client -> - case find_proc(Client) of - {ok, ProcInt} -> - Proc = assign_proc(Client, ProcInt), + CanSpawn = get_count(Lang) < HardLimit, + case find_proc(Client, CanSpawn) of + {ok, Proc0} -> + Proc = assign_proc(Client, Proc0), gen_server:reply(From, {ok, Proc, State#state.config}), remove_waiting_client(Client), flush_waiters(State, Lang); @@ -548,44 +663,56 @@ flush_waiters(State, Lang) -> remove_waiting_client(Client), flush_waiters(State, Lang); not_found when CanSpawn -> - NewState = spawn_proc(State, Client), + ok = spawn_proc(Client), remove_waiting_client(Client), - flush_waiters(NewState, Lang); + flush_waiters(State, Lang); not_found -> - State + % 10% of limit + ReapBatch = round(HardLimit * 0.1 + 1), + case reap_idle(ReapBatch, Lang) of + N when is_integer(N), N > 0 -> + % We may have room available to spawn + case get_count(Lang) < HardLimit of + true -> + ok = spawn_proc(Client), + remove_waiting_client(Client), + flush_waiters(State, Lang); + false -> + ok + end; + 0 -> + ok + end end; undefined -> - State + ok end. - -add_waiting_client(Client) -> - ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}). +add_waiting_client(#client{from = {_Pid, Tag}, lang = Lang} = Client) -> + % Use Lang in the key first since we can look it up using a partially bound + % in get_waiting_client/2. Use the reply tag to provide uniqueness. + Key = {Lang, timestamp(), Tag}, + true = ets:insert_new(?WAITERS, Client#client{wait_key = Key}). -spec get_waiting_client(Lang :: binary()) -> undefined | #client{}. get_waiting_client(Lang) -> - case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of + % Use a partially bound key (Lang) to avoid scanning unrelated procs + Key = {Lang, '_', '_'}, + case ets:match_object(?WAITERS, #client{wait_key = Key, _ = '_'}, 1) of '$end_of_table' -> undefined; {[#client{}=Client], _} -> Client end. +remove_waiting_client(#client{wait_key = Key}) -> + ets:delete(?WAITERS, Key). -remove_waiting_client(#client{timestamp = Timestamp}) -> - ets:delete(?WAITERS, Timestamp). - - -can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) -> - case dict:find(Lang, Counts) of - {ok, Count} -> Count < HardLimit; - error -> true - end. get_proc_config() -> Limit = config:get_boolean("query_server_config", "reduce_limit", true), - Timeout = config:get_integer("couchdb", "os_process_timeout", 5000), + Timeout = get_os_process_timeout(), {[ {<<"reduce_limit">>, Limit}, {<<"timeout">>, Timeout} @@ -593,9 +720,37 @@ get_proc_config() -> get_hard_limit() -> - LimStr = config:get("query_server_config", "os_process_limit", "100"), - list_to_integer(LimStr). + config:get_integer("query_server_config", "os_process_limit", 100). get_soft_limit() -> config:get_integer("query_server_config", "os_process_soft_limit", 100). + +get_os_process_timeout() -> + config:get_integer("couchdb", "os_process_timeout", 5000). + +timestamp() -> + erlang:monotonic_time(). + +foreach_proc(Fun) when is_function(Fun, 1) -> + FoldFun = fun(#proc{} = Proc, ok) -> + Fun(Proc), + ok + end, + ok = ets:foldl(FoldFun, ok, ?PROCS). + +inc_count(Lang) -> + ets:update_counter(?COUNTERS, Lang, 1, {Lang, 0}), + ok. + +dec_count(Lang) -> + ets:update_counter(?COUNTERS, Lang, -1, {Lang, 0}), + ok. + +get_count(Lang) -> + case ets:lookup(?COUNTERS, Lang) of + [{_, Count}] when is_integer(Count) -> + Count; + [] -> + 0 + end. diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl index 10b8048dd..43a1b7fa6 100644 --- a/src/couch/src/couch_query_servers.erl +++ b/src/couch/src/couch_query_servers.erl @@ -14,16 +14,16 @@ -export([try_compile/4]). -export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]). --export([reduce/3, rereduce/3,validate_doc_update/5]). +-export([reduce/3, rereduce/3, validate_doc_update/6]). -export([filter_docs/5]). --export([filter_view/3]). +-export([filter_view/4]). -export([finalize/2]). -export([rewrite/3]). --export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). +-export([with_ddoc_proc/3, proc_prompt/2, ddoc_prompt/4, ddoc_proc_prompt/3, json_doc/1]). % For 210-os-proc-pool.t --export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]). +-export([get_os_process/1, get_ddoc_process/3, ret_os_process/1]). -include_lib("couch/include/couch_db.hrl"). @@ -355,17 +355,19 @@ approx_count_distinct(rereduce, Reds) -> hyper:union([Filter || [_, Filter] <- Reds]). % use the function stored in ddoc.validate_doc_update to test an update. --spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when +-spec validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when + Db :: term(), DDoc :: ddoc(), EditDoc :: doc(), DiskDoc :: doc() | nil, Ctx :: user_ctx(), SecObj :: sec_obj(). -validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> +validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), JsonDiskDoc = json_doc(DiskDoc), Resp = ddoc_prompt( + Db, DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx, SecObj] @@ -392,7 +394,7 @@ rewrite(Req, Db, DDoc) -> F =/= <<"info">>, F =/= <<"form">>, F =/= <<"uuid">>, F =/= <<"id">>], JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields), - case couch_query_servers:ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of + case ddoc_prompt(Db, DDoc, [<<"rewrites">>], [JsonReq]) of {[{<<"forbidden">>, Message}]} -> throw({forbidden, Message}); {[{<<"unauthorized">>, Message}]} -> @@ -480,10 +482,10 @@ json_doc(nil, _) -> json_doc(Doc, Options) -> couch_doc:to_json_obj(Doc, Options). -filter_view(DDoc, VName, Docs) -> +filter_view(Db, DDoc, VName, Docs) -> Options = json_doc_options(), JsonDocs = [json_doc(Doc, Options) || Doc <- Docs], - [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]), + [true, Passes] = ddoc_prompt(Db, DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]), {ok, Passes}. filter_docs(Req, Db, DDoc, FName, Docs) -> @@ -496,33 +498,34 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> Options = json_doc_options(), JsonDocs = [json_doc(Doc, Options) || Doc <- Docs], try - {ok, filter_docs_int(DDoc, FName, JsonReq, JsonDocs)} + {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)} catch throw:{os_process_error,{exit_status,1}} -> %% batch used too much memory, retry sequentially. Fun = fun(JsonDoc) -> - filter_docs_int(DDoc, FName, JsonReq, [JsonDoc]) + filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc]) end, {ok, lists:flatmap(Fun, JsonDocs)} end. -filter_docs_int(DDoc, FName, JsonReq, JsonDocs) -> - [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], +filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) -> + [true, Passes] = ddoc_prompt(Db, DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq]), Passes. ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) -> proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]). -ddoc_prompt(DDoc, FunPath, Args) -> - with_ddoc_proc(DDoc, fun({Proc, DDocId}) -> +ddoc_prompt(Db, DDoc, FunPath, Args) -> + with_ddoc_proc(Db, DDoc, fun({Proc, DDocId}) -> proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]) end). -with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> +with_ddoc_proc(Db, #doc{id = DDocId, revs = {Start, [DiskRev | _]}} = DDoc, Fun) -> Rev = couch_doc:rev_to_str({Start, DiskRev}), + DbKey = db_key(Db), DDocKey = {DDocId, Rev}, - Proc = get_ddoc_process(DDoc, DDocKey), + Proc = get_ddoc_process(DDoc, DbKey, DDocKey), try Fun({Proc, DDocId}) of Resp -> ok = ret_os_process(Proc), @@ -532,6 +535,22 @@ with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> erlang:raise(Tag, Err, Stack) end. +db_key(DbName) when is_binary(DbName) -> + Name = mem3:dbname(DbName), + case config:get("query_server_config", "db_tag", "name") of + "prefix" -> + case binary:split(Name, <<"/">>) of + [Prefix, _] when byte_size(Prefix) > 0 -> Prefix; + _ -> Name + end; + "none" -> + undefined; + _ -> + Name + end; +db_key(Db) -> + db_key(couch_db:name(Db)). + proc_prompt(Proc, Args) -> case proc_prompt_raw(Proc, Args) of {json, Json} -> @@ -622,12 +641,9 @@ proc_set_timeout(Proc, Timeout) -> {Mod, Func} = Proc#proc.set_timeout_fun, apply(Mod, Func, [Proc#proc.pid, Timeout]). -get_os_process_timeout() -> - config:get_integer("couchdb", "os_process_timeout", 5000). - -get_ddoc_process(#doc{} = DDoc, DDocKey) -> +get_ddoc_process(#doc{} = DDoc, DbKey, DDocKey) -> % remove this case statement - case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of + case couch_proc_manager:get_proc(DDoc, DbKey, DDocKey) of {ok, Proc, {QueryConfig}} -> % process knows the ddoc case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of @@ -636,14 +652,14 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) -> Proc; _ -> catch proc_stop(Proc), - get_ddoc_process(DDoc, DDocKey) + get_ddoc_process(DDoc, DbKey, DDocKey) end; Error -> throw(Error) end. get_os_process(Lang) -> - case gen_server:call(couch_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of + case couch_proc_manager:get_proc(Lang) of {ok, Proc, {QueryConfig}} -> case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of true -> @@ -658,7 +674,7 @@ get_os_process(Lang) -> end. ret_os_process(Proc) -> - true = gen_server:call(couch_proc_manager, {ret_proc, Proc}, infinity), + true = couch_proc_manager:ret_proc(Proc), catch unlink(Proc#proc.pid), ok. @@ -670,7 +686,10 @@ throw_stat_error(Else) -> -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). builtin_sum_rows_negative_test() -> A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}], @@ -799,4 +818,42 @@ force_utf8_test() -> end end, NotOk). +db_key_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_db_key_default), + ?TDEF_FE(t_db_key_prefix), + ?TDEF_FE(t_db_key_none) + ] + }. + +setup() -> + meck:new(config, [passthrough]), + meck:expect(config, get, fun(_, _, Default) -> Default end), + ok. + +teardown(_) -> + meck:unload(). + +t_db_key_default(_) -> + ?assertEqual(<<"foo">>, db_key(<<"foo">>)), + ?assertEqual(<<"foo/bar">>, db_key(<<"foo/bar">>)), + ?assertEqual(<<"foo/bar">>, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)). + +t_db_key_prefix(_) -> + meck:expect(config, get, fun(_, "db_tag", _) -> "prefix" end), + ?assertEqual(<<"foo">>, db_key(<<"foo">>)), + ?assertEqual(<<"foo">>, db_key(<<"foo/bar">>)), + ?assertEqual(<<"foo">>, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)), + ?assertEqual(<<"/foo">>, db_key(<<"/foo">>)). + +t_db_key_none(_) -> + meck:expect(config, get, fun(_, "db_tag", _) -> "none" end), + ?assertEqual(undefined, db_key(<<"foo">>)), + ?assertEqual(undefined, db_key(<<"foo/bar">>)), + ?assertEqual(undefined, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)). + -endif. diff --git a/src/couch/test/eunit/couch_auth_cache_tests.erl b/src/couch/test/eunit/couch_auth_cache_tests.erl index 71faf77d6..ceed68148 100644 --- a/src/couch/test/eunit/couch_auth_cache_tests.erl +++ b/src/couch/test/eunit/couch_auth_cache_tests.erl @@ -345,5 +345,6 @@ validate(DiskDoc, NewDoc) -> validate(DiskDoc, NewDoc, JSONCtx) -> {ok, DDoc0} = couch_auth_cache:auth_design_doc(<<"_design/anything">>), + Db = <<"validate_couch_auth_cache_tests">>, DDoc = DDoc0#doc{revs = {1, [<<>>]}}, - couch_query_servers:validate_doc_update(DDoc, NewDoc, DiskDoc, JSONCtx, []). + couch_query_servers:validate_doc_update(Db, DDoc, NewDoc, DiskDoc, JSONCtx, []). diff --git a/src/couch/test/eunit/couch_query_servers_tests.erl b/src/couch/test/eunit/couch_query_servers_tests.erl index 440fc8e1b..d142498d6 100644 --- a/src/couch/test/eunit/couch_query_servers_tests.erl +++ b/src/couch/test/eunit/couch_query_servers_tests.erl @@ -110,7 +110,7 @@ should_return_object_on_false() -> should_split_large_batches() -> Req = {json_req, {[]}}, - Db = undefined, + Db = <<"somedb">>, DDoc = #doc{ id = <<"_design/foo">>, revs = {0, [<<"bork bork bork">>]}, diff --git a/src/couch/test/eunit/couchdb_os_proc_pool.erl b/src/couch/test/eunit/couchdb_os_proc_pool.erl index b552e114a..645385376 100644 --- a/src/couch/test/eunit/couchdb_os_proc_pool.erl +++ b/src/couch/test/eunit/couchdb_os_proc_pool.erl @@ -15,240 +15,492 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). + -define(TIMEOUT, 1000). setup() -> - ok = couch_proc_manager:reload(), + Ctx = test_util:start_couch(), meck:new(couch_os_process, [passthrough]), - ok = setup_config(). + meck:new(couch_proc_manager, [passthrough]), + ok = setup_config(), + Ctx. -teardown(_) -> +teardown(Ctx) -> + ok = teardown_config(), meck:unload(), + test_util:stop_couch(Ctx), ok. os_proc_pool_test_() -> { "OS processes pool tests", { - setup, - fun test_util:start_couch/0, fun test_util:stop_couch/1, - { - foreach, - fun setup/0, fun teardown/1, - [ - should_block_new_proc_on_full_pool(), - should_free_slot_on_proc_unexpected_exit(), - should_reuse_known_proc(), -% should_process_waiting_queue_as_fifo(), - should_reduce_pool_on_idle_os_procs(), - should_not_return_broken_process_to_the_pool() - ] - } + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_block_new_proc_on_full_pool), + ?TDEF_FE(should_free_slot_on_proc_unexpected_exit), + ?TDEF_FE(should_reuse_known_proc), + ?TDEF_FE(should_process_waiting_queue_as_fifo), + ?TDEF_FE(should_reduce_pool_on_idle_os_procs), + ?TDEF_FE(should_reduce_pool_of_tagged_processes_on_idle), + ?TDEF_FE(should_not_return_broken_process_to_the_pool), + ?TDEF_FE(oldest_tagged_process_is_reaped), + ?TDEF_FE(untagged_process_is_replenished), + ?TDEF_FE(exact_ddoc_tagged_process_is_picked_first), + ?TDEF_FE(db_tagged_process_is_second_choice), + ?TDEF_FE(if_no_tagged_process_found_new_must_be_spawned), + ?TDEF_FE(db_tag_none_works), + ?TDEF_FE(stale_procs_are_cleaned), + ?TDEF_FE(bad_query_language) + ] } }. +should_block_new_proc_on_full_pool(_) -> + Client1 = spawn_client(), + Client2 = spawn_client(), + Client3 = spawn_client(), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + Proc3 = get_client_proc(Client3, "3"), + + ?assertNotEqual(Proc1, Proc2), + ?assertNotEqual(Proc2, Proc3), + ?assertNotEqual(Proc3, Proc1), + + Client4 = spawn_client(), + ?assertEqual(timeout, ping_client(Client4)), + + ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, ping_client(Client4)), + + Proc4 = get_client_proc(Client4, "4"), + + ?assertEqual(Proc1#proc.pid, Proc4#proc.pid), + ?assertNotEqual(Proc1#proc.client, Proc4#proc.client), + + stop_clients([Client2, Client3, Client4]). + +should_free_slot_on_proc_unexpected_exit(_) -> + Client1 = spawn_client(), + Client2 = spawn_client(), + Client3 = spawn_client(), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + Proc3 = get_client_proc(Client3, "3"), + + ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid), + ?assertNotEqual(Proc1#proc.client, Proc2#proc.client), + ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid), + ?assertNotEqual(Proc2#proc.client, Proc3#proc.client), + ?assertNotEqual(Proc3#proc.pid, Proc1#proc.pid), + ?assertNotEqual(Proc3#proc.client, Proc1#proc.client), + + ?assertEqual(ok, kill_client(Client1)), + + Client4 = spawn_client(), + ?assertEqual(ok, ping_client(Client4)), + + Proc4 = get_client_proc(Client4, "4"), + + ?assertEqual(Proc4#proc.pid, Proc1#proc.pid), + ?assertNotEqual(Proc4#proc.client, Proc1#proc.client), + ?assertNotEqual(Proc2#proc.pid, Proc4#proc.pid), + ?assertNotEqual(Proc2#proc.client, Proc4#proc.client), + ?assertNotEqual(Proc3#proc.pid, Proc4#proc.pid), + ?assertNotEqual(Proc3#proc.client, Proc4#proc.client), + + stop_clients([Client2, Client3, Client4]). + +should_reuse_known_proc(_) -> + Db = <<"db">>, + Client1 = spawn_client(Db, <<"ddoc1">>), + Client2 = spawn_client(Db, <<"ddoc2">>), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid), + + ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, stop_client(Client2)), + ?assert(is_process_alive(Proc1#proc.pid)), + ?assert(is_process_alive(Proc2#proc.pid)), + + Client1Again = spawn_client(Db, <<"ddoc1">>), + ?assertEqual(ok, ping_client(Client1Again)), + Proc1Again = get_client_proc(Client1Again, "1-again"), + ?assertEqual(Proc1#proc.pid, Proc1Again#proc.pid), + ?assertNotEqual(Proc1#proc.client, Proc1Again#proc.client), + ?assertEqual(ok, stop_client(Client1Again)). -should_block_new_proc_on_full_pool() -> - ?_test(begin - Client1 = spawn_client(), - Client2 = spawn_client(), - Client3 = spawn_client(), +should_process_waiting_queue_as_fifo(_) -> + Db = <<"db">>, + meck:reset(couch_proc_manager), + Client1 = spawn_client(Db, <<"ddoc1">>), + meck:wait(1, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000), + Client2 = spawn_client(Db, <<"ddoc2">>), + meck:wait(2, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000), + Client3 = spawn_client(Db, <<"ddoc3">>), + meck:wait(3, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000), + Client4 = spawn_client(Db, <<"ddoc4">>), + meck:wait(4, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000), + Client5 = spawn_client(Db, <<"ddoc5">>), + meck:wait(5, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000), - ?assertEqual(ok, ping_client(Client1)), - ?assertEqual(ok, ping_client(Client2)), - ?assertEqual(ok, ping_client(Client3)), + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), + ?assertEqual(timeout, ping_client(Client4)), + ?assertEqual(timeout, ping_client(Client5)), - Proc1 = get_client_proc(Client1, "1"), - Proc2 = get_client_proc(Client2, "2"), - Proc3 = get_client_proc(Client3, "3"), + Proc1 = get_client_proc(Client1, "1"), + ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, ping_client(Client4)), + Proc4 = get_client_proc(Client4, "4"), - ?assertNotEqual(Proc1, Proc2), - ?assertNotEqual(Proc2, Proc3), - ?assertNotEqual(Proc3, Proc1), + ?assertNotEqual(Proc4#proc.client, Proc1#proc.client), + ?assertEqual(Proc1#proc.pid, Proc4#proc.pid), + ?assertEqual(timeout, ping_client(Client5)), - Client4 = spawn_client(), - ?assertEqual(timeout, ping_client(Client4)), + ?assertEqual(ok, stop_client(Client2)), + ?assertEqual(ok, stop_client(Client3)), + ?assertEqual(ok, stop_client(Client4)), + ?assertEqual(ok, stop_client(Client5)). - ?assertEqual(ok, stop_client(Client1)), - ?assertEqual(ok, ping_client(Client4)), +should_reduce_pool_on_idle_os_procs(_) -> + %% os_process_idle_limit is in sec + cfg_set("os_process_idle_limit", "1"), - Proc4 = get_client_proc(Client4, "4"), + Db = undefined, + Client1 = spawn_client(Db, <<"ddoc1">>), + Client2 = spawn_client(Db, <<"ddoc2">>), + Client3 = spawn_client(Db, <<"ddoc3">>), - ?assertEqual(Proc1#proc.pid, Proc4#proc.pid), - ?assertNotEqual(Proc1#proc.client, Proc4#proc.client), + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), - lists:map(fun(C) -> - ?assertEqual(ok, stop_client(C)) - end, [Client2, Client3, Client4]) - end). + ?assertEqual(3, couch_proc_manager:get_proc_count()), + ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, stop_client(Client2)), + ?assertEqual(ok, stop_client(Client3)), -should_free_slot_on_proc_unexpected_exit() -> - ?_test(begin - Client1 = spawn_client(), - Client2 = spawn_client(), - Client3 = spawn_client(), + % granularity of idle limit is in seconds + timer:sleep(1000), + wait_process_count(1). - ?assertEqual(ok, ping_client(Client1)), - ?assertEqual(ok, ping_client(Client2)), - ?assertEqual(ok, ping_client(Client3)), +should_reduce_pool_of_tagged_processes_on_idle(_) -> + %% os_process_idle_limit is in sec + cfg_set("os_process_idle_limit", "1"), - Proc1 = get_client_proc(Client1, "1"), - Proc2 = get_client_proc(Client2, "2"), - Proc3 = get_client_proc(Client3, "3"), + Db = <<"reduce_pool_on_idle_db">>, + Client1 = spawn_client(Db, <<"ddoc1">>), + Client2 = spawn_client(Db, <<"ddoc2">>), + Client3 = spawn_client(Db, <<"ddoc3">>), - ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid), - ?assertNotEqual(Proc1#proc.client, Proc2#proc.client), - ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid), - ?assertNotEqual(Proc2#proc.client, Proc3#proc.client), - ?assertNotEqual(Proc3#proc.pid, Proc1#proc.pid), - ?assertNotEqual(Proc3#proc.client, Proc1#proc.client), + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), - ?assertEqual(ok, kill_client(Client1)), + ?assertEqual(3, couch_proc_manager:get_proc_count()), - Client4 = spawn_client(), - ?assertEqual(ok, ping_client(Client4)), + stop_clients([Client1, Client2, Client3]), - Proc4 = get_client_proc(Client4, "4"), + timer:sleep(1000), + wait_process_count(0). - ?assertEqual(Proc4#proc.pid, Proc1#proc.pid), - ?assertNotEqual(Proc4#proc.client, Proc1#proc.client), - ?assertNotEqual(Proc2#proc.pid, Proc4#proc.pid), - ?assertNotEqual(Proc2#proc.client, Proc4#proc.client), - ?assertNotEqual(Proc3#proc.pid, Proc4#proc.pid), - ?assertNotEqual(Proc3#proc.client, Proc4#proc.client), +oldest_tagged_process_is_reaped(_) -> + Client1 = spawn_client(<<"db1">>, <<"ddoc1">>), + Client2 = spawn_client(<<"db2">>, <<"ddoc1">>), + Client3 = spawn_client(<<"db3">>, <<"ddoc1">>), - lists:map(fun(C) -> - ?assertEqual(ok, stop_client(C)) - end, [Client2, Client3, Client4]) - end). + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + Proc3 = get_client_proc(Client3, "3"), -should_reuse_known_proc() -> - ?_test(begin - Client1 = spawn_client(<<"ddoc1">>), - Client2 = spawn_client(<<"ddoc2">>), + ?assert(all_alive_all_different([Proc1, Proc2, Proc3])), - ?assertEqual(ok, ping_client(Client1)), - ?assertEqual(ok, ping_client(Client2)), + stop_clients([Client1, Client2, Client3]), - Proc1 = get_client_proc(Client1, "1"), - Proc2 = get_client_proc(Client2, "2"), - ?assertNotEqual(Proc1#proc.pid, Proc2#proc.pid), + % All procs should be released back into the pool + wait_tagged_idle_count(3), - ?assertEqual(ok, stop_client(Client1)), - ?assertEqual(ok, stop_client(Client2)), - ?assert(is_process_alive(Proc1#proc.pid)), - ?assert(is_process_alive(Proc2#proc.pid)), + % Processes should be alive + ?assert(all_alive([Proc1, Proc2, Proc3])), - Client1Again = spawn_client(<<"ddoc1">>), - ?assertEqual(ok, ping_client(Client1Again)), - Proc1Again = get_client_proc(Client1Again, "1-again"), - ?assertEqual(Proc1#proc.pid, Proc1Again#proc.pid), - ?assertNotEqual(Proc1#proc.client, Proc1Again#proc.client), - ?assertEqual(ok, stop_client(Client1Again)) - end). - - -%should_process_waiting_queue_as_fifo() -> -% ?_test(begin -% Client1 = spawn_client(<<"ddoc1">>), -% Client2 = spawn_client(<<"ddoc2">>), -% Client3 = spawn_client(<<"ddoc3">>), -% Client4 = spawn_client(<<"ddoc4">>), -% timer:sleep(100), -% Client5 = spawn_client(<<"ddoc5">>), -% -% ?assertEqual(ok, ping_client(Client1)), -% ?assertEqual(ok, ping_client(Client2)), -% ?assertEqual(ok, ping_client(Client3)), -% ?assertEqual(timeout, ping_client(Client4)), -% ?assertEqual(timeout, ping_client(Client5)), -% -% Proc1 = get_client_proc(Client1, "1"), -% ?assertEqual(ok, stop_client(Client1)), -% ?assertEqual(ok, ping_client(Client4)), -% Proc4 = get_client_proc(Client4, "4"), -% -% ?assertNotEqual(Proc4#proc.client, Proc1#proc.client), -% ?assertEqual(Proc1#proc.pid, Proc4#proc.pid), -% ?assertEqual(timeout, ping_client(Client5)), -% -% ?assertEqual(ok, stop_client(Client2)), -% ?assertEqual(ok, stop_client(Client3)), -% ?assertEqual(ok, stop_client(Client4)), -% ?assertEqual(ok, stop_client(Client5)) -% end). + % Spawning a new tagged proc with a different tag should kill + % the oldest unused proc and spawn a new one + Client4 = spawn_client(<<"db4">>, <<"ddoc1">>), + ?assertEqual(ok, ping_client(Client4)), + Proc4 = get_client_proc(Client4, "4"), + + ?assert(all_alive_all_different([Proc2, Proc3, Proc4])), + ?assertNot(is_process_alive(Proc1#proc.pid)), + + ?assertEqual(ok, stop_client(Client4)). + +untagged_process_is_replenished(_) -> + Client1 = spawn_client(), + Client2 = spawn_client(), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + + ?assert(all_alive_all_different([Proc1, Proc2])), + + stop_clients([Client1, Client2]), + + % All procs should be released back into the pool + % and they are now all untagged idle + wait_idle_count(2), + ?assertEqual(0, tagged_idle_count()), + + % Processes should still be alive + ?assert(all_alive([Proc1, Proc2])), + + % Spawning a new tagged proc should tag one of the procs + % and also asynchronously replenish the untagged pool + Client3 = spawn_client(<<"db">>, <<"ddoc1">>), + ?assertEqual(ok, ping_client(Client3)), + + % The process is one of the previously untagged ones + Proc3 = get_client_proc(Client3, "3"), + Pid3 = Proc3#proc.pid, + ?assert(lists:member(Pid3, proc_pids([Proc1, Proc2]))), + + % wait for replinishment + wait_idle_count(2), + + ?assertEqual(ok, stop_client(Client3)). + +exact_ddoc_tagged_process_is_picked_first(_) -> + Client1 = spawn_client(<<"db">>, <<"ddoc1">>), + Client2 = spawn_client(<<"db">>, <<"ddoc2">>), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + + ?assert(all_alive_all_different([Proc1, Proc2])), + + stop_clients([Client1, Client2]), + + % All procs should be released back into the pool + % and they now tagged and idle + wait_tagged_idle_count(2), + wait_idle_count(2), + + % Processes should still be alive + ?assert(all_alive([Proc1, Proc2])), + + % Spawning a new tagged proc should pick the one with + % matching ddoc + Client3 = spawn_client(<<"db">>, <<"ddoc1">>), + ?assertEqual(ok, ping_client(Client3)), + Proc3 = get_client_proc(Client3, "3"), + ?assertEqual(Proc1#proc.pid, Proc3#proc.pid), + + ?assertEqual(ok, stop_client(Client3)). + +db_tagged_process_is_second_choice(_) -> + Client1 = spawn_client(<<"db1">>, <<"ddoc1">>), + Client2 = spawn_client(<<"db2">>, <<"ddoc2">>), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + ?assert(all_alive_all_different([Proc1, Proc2])), -should_reduce_pool_on_idle_os_procs() -> - ?_test(begin - %% os_process_idle_limit is in sec - config:set("query_server_config", - "os_process_idle_limit", "1", false), - ok = confirm_config("os_process_idle_limit", "1"), + stop_clients([Client1, Client2]), - Client1 = spawn_client(<<"ddoc1">>), - Client2 = spawn_client(<<"ddoc2">>), - Client3 = spawn_client(<<"ddoc3">>), + % All procs should be released back into the pool + % and they now tagged and idle + wait_tagged_idle_count(2), + wait_idle_count(2), - ?assertEqual(ok, ping_client(Client1)), - ?assertEqual(ok, ping_client(Client2)), - ?assertEqual(ok, ping_client(Client3)), + % Processes should still be alive + ?assert(all_alive([Proc1, Proc2])), - ?assertEqual(3, couch_proc_manager:get_proc_count()), + % Spawning a new tagged proc should pick the one with + % the matching ddoc + Client3 = spawn_client(<<"db1">>, <<"ddoc3">>), + ?assertEqual(ok, ping_client(Client3)), + Proc3 = get_client_proc(Client3, "3"), + ?assertEqual(Proc1#proc.pid, Proc3#proc.pid), - ?assertEqual(ok, stop_client(Client1)), - ?assertEqual(ok, stop_client(Client2)), - ?assertEqual(ok, stop_client(Client3)), + ?assertEqual(ok, stop_client(Client3)). - timer:sleep(1200), - ?assertEqual(1, couch_proc_manager:get_proc_count()) - end). +if_no_tagged_process_found_new_must_be_spawned(_) -> + Client1 = spawn_client(<<"db1">>, <<"ddoc">>), + Client2 = spawn_client(<<"db2">>, <<"ddoc">>), + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), -should_not_return_broken_process_to_the_pool() -> - ?_test(begin - config:set("query_server_config", - "os_process_soft_limit", "1", false), - ok = confirm_config("os_process_soft_limit", "1"), + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), - config:set("query_server_config", - "os_process_limit", "1", false), - ok = confirm_config("os_process_limit", "1"), + ?assert(all_alive_all_different([Proc1, Proc2])), - DDoc1 = ddoc(<<"_design/ddoc1">>), + stop_clients([Client1, Client2]), - meck:reset(couch_os_process), + % All procs should be released back into the pool + % and they now tagged and idle + wait_tagged_idle_count(2), + wait_idle_count(2), - ?assertEqual(0, couch_proc_manager:get_proc_count()), - ok = couch_query_servers:with_ddoc_proc(DDoc1, fun(_) -> ok end), - ?assertEqual(0, meck:num_calls(couch_os_process, stop, 1)), - ?assertEqual(1, couch_proc_manager:get_proc_count()), + % Processes should still be alive + ?assert(all_alive([Proc1, Proc2])), - ?assertError(bad, couch_query_servers:with_ddoc_proc(DDoc1, fun(_) -> + % If new tagged process with new db should spawn + % new process never pick up an existing one + Client3 = spawn_client(<<"db3">>, <<"ddoc">>), + ?assertEqual(ok, ping_client(Client3)), + Proc3 = get_client_proc(Client3, "3"), + ?assertNotEqual(Proc1#proc.pid, Proc3#proc.pid), + ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid), + + % db1 and db2 procs should still be sitting idle + ?assertEqual(2, tagged_idle_count()), + + % After 3rd proc returns to the pool there should + % be 3 tagged idle processes + ?assertEqual(ok, stop_client(Client3)), + wait_tagged_idle_count(3), + ?assertEqual(3, idle_count()). + +db_tag_none_works(_) -> + cfg_set("db_tag", "none"), + Client1 = spawn_client(undefined, <<"ddoc1">>), + Client2 = spawn_client(undefined, <<"ddoc2">>), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + + ?assert(all_alive_all_different([Proc1, Proc2])), + + stop_clients([Client1, Client2]), + + % All procs should be released back into the pool + % they should be untagged effectively + wait_idle_count(2), + ?assertEqual(0, tagged_idle_count()), + + % Processes should still be alive + ?assert(all_alive([Proc1, Proc2])), + + % If new tagged process with new db should spawn + % new process and pick based on ddoc id matching + Client3 = spawn_client(undefined, <<"ddoc1">>), + ?assertEqual(ok, ping_client(Client3)), + Proc3 = get_client_proc(Client3, "3"), + ?assertEqual(Proc1#proc.pid, Proc3#proc.pid), + ?assertNotEqual(Proc2#proc.pid, Proc3#proc.pid), + + wait_idle_count(1), + + % After 3rd client stop there should be 2 idle + % untagged procs + ?assertEqual(ok, stop_client(Client3)), + wait_idle_count(2), + ?assertEqual(0, tagged_idle_count()). + +stale_procs_are_cleaned(_) -> + Client1 = spawn_client(), + Client2 = spawn_client(), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + + Proc1 = get_client_proc(Client1, "1"), + Proc2 = get_client_proc(Client2, "2"), + + ?assert(all_alive_all_different([Proc1, Proc2])), + + ?assertEqual(0, couch_proc_manager:get_stale_proc_count()), + ?assertEqual(ok, couch_proc_manager:reload()), + ?assertEqual(2, couch_proc_manager:get_stale_proc_count()), + + stop_clients([Client1, Client2]), + ?assertEqual(ok, couch_proc_manager:terminate_stale_procs()), + wait_idle_count(0), + ?assertEqual(0, couch_proc_manager:get_proc_count()). + +bad_query_language(_) -> + Expect = {unknown_query_language, <<"bad">>}, + ?assertThrow(Expect, couch_query_servers:get_os_process(<<"bad">>)). + +should_not_return_broken_process_to_the_pool(_) -> + cfg_set("os_process_soft_limit", "1"), + cfg_set("os_process_limit", "1"), + + Db = <<"thedb">>, + DDoc1 = ddoc(<<"_design/ddoc1">>), + + meck:reset(couch_os_process), + + ?assertEqual(0, couch_proc_manager:get_proc_count()), + ok = couch_query_servers:with_ddoc_proc(Db, DDoc1, fun(_) -> ok end), + ?assertEqual(0, meck:num_calls(couch_os_process, stop, 1)), + ?assertEqual(1, couch_proc_manager:get_proc_count()), + + ?assertError( + bad, + couch_query_servers:with_ddoc_proc(Db, DDoc1, fun(_) -> error(bad) - end)), - ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)), - - WaitFun = fun() -> - case couch_proc_manager:get_proc_count() of - 0 -> ok; - N when is_integer(N), N > 0 -> wait - end - end, - case test_util:wait(WaitFun, 5000) of - timeout -> error(timeout); - _ -> ok - end, - ?assertEqual(0, couch_proc_manager:get_proc_count()), - - DDoc2 = ddoc(<<"_design/ddoc2">>), - ok = couch_query_servers:with_ddoc_proc(DDoc2, fun(_) -> ok end), - ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)), - ?assertEqual(1, couch_proc_manager:get_proc_count()) - end). + end) + ), + ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)), + + WaitFun = fun() -> + case couch_proc_manager:get_proc_count() of + 0 -> ok; + N when is_integer(N), N > 0 -> wait + end + end, + case test_util:wait(WaitFun, 5000) of + timeout -> error(timeout); + _ -> ok + end, + ?assertEqual(0, couch_proc_manager:get_proc_count()), + + DDoc2 = ddoc(<<"_design/ddoc2">>), + ok = couch_query_servers:with_ddoc_proc(Db, DDoc2, fun(_) -> ok end), + ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)), + ?assertEqual(1, couch_proc_manager:get_proc_count()). ddoc(DDocId) -> @@ -269,26 +521,14 @@ setup_config() -> config:set("native_query_servers", "enable_erlang_query_server", "true", false), config:set("query_server_config", "os_process_limit", "3", false), config:set("query_server_config", "os_process_soft_limit", "2", false), - ok = confirm_config("os_process_soft_limit", "2"). - -confirm_config(Key, Value) -> - confirm_config(Key, Value, 0). - -confirm_config(Key, Value, Count) -> - case config:get("query_server_config", Key) of - Value -> - ok; - _ when Count > 10 -> - erlang:error({config_setup, [ - {module, ?MODULE}, - {line, ?LINE}, - {value, timeout} - ]}); - _ -> - %% we need to wait to let gen_server:cast finish - timer:sleep(10), - confirm_config(Key, Value, Count + 1) - end. + ok. + +teardown_config() -> + config:delete("native_query_servers", "enable_erlang_query_server", false), + config:delete("query_server_config", "os_process_limit", false), + config:delete("query_server_config", "os_process_soft_limit", false), + config:delete("query_server_config", "db_tag", false), + ok. spawn_client() -> Parent = self(), @@ -299,13 +539,13 @@ spawn_client() -> end), {Pid, Ref}. -spawn_client(DDocId) -> +spawn_client(Db, DDocId) -> Parent = self(), Ref = make_ref(), Pid = spawn(fun() -> DDocKey = {DDocId, <<"1-abcdefgh">>}, DDoc = #doc{body={[{<<"language">>, <<"erlang">>}]}}, - Proc = couch_query_servers:get_ddoc_process(DDoc, DDocKey), + Proc = couch_query_servers:get_ddoc_process(DDoc, Db, DDocKey), loop(Parent, Ref, Proc) end), {Pid, Ref}. @@ -332,20 +572,30 @@ get_client_proc({Pid, Ref}, ClientName) -> end. stop_client({Pid, Ref}) -> + MRef = erlang:monitor(process, Pid), Pid ! stop, receive {stop, Ref} -> + receive + {'DOWN', MRef, process, Pid, _} -> ok + end, ok after ?TIMEOUT -> + erlang:demonitor(MRef, [flush]), timeout end. kill_client({Pid, Ref}) -> + MRef = erlang:monitor(process, Pid), Pid ! die, receive {die, Ref} -> + receive + {'DOWN', MRef, process, Pid, _} -> ok + end, ok after ?TIMEOUT -> + erlang:demonitor(MRef, [flush]), timeout end. @@ -364,3 +614,68 @@ loop(Parent, Ref, Proc) -> Parent ! {die, Ref}, exit(some_error) end. + +proc_pids(Procs) -> + [P#proc.pid || P <- Procs]. + +all_alive(Procs) -> + lists:all(fun is_process_alive/1, proc_pids(Procs)). + +all_different(Procs) -> + lists:usort(proc_pids(Procs)) =:= lists:sort(proc_pids(Procs)). + +all_alive_all_different(Procs) -> + all_alive(Procs) andalso all_different(Procs). + +idle_count() -> + ets:info(couch_proc_manager_idle_by_db, size). + +tagged_idle_count() -> + ets:info(couch_proc_manager_idle_access, size). + +stop_clients(Clients) -> + Fun = fun(C) -> ?assertEqual(ok, stop_client(C)) end, + lists:map(Fun, Clients). + +wait_tagged_idle_count(N) -> + WaitFun = fun() -> + case tagged_idle_count() == N of + true -> ok; + false -> wait + end + end, + case test_util:wait(WaitFun, 5000) of + timeout -> error(timeout); + _ -> ok + end, + ?assertEqual(N, tagged_idle_count()). + +wait_idle_count(N) -> + WaitFun = fun() -> + case idle_count() == N of + true -> ok; + false -> wait + end + end, + case test_util:wait(WaitFun, 5000) of + timeout -> error(timeout); + _ -> ok + end, + ?assertEqual(N, idle_count()). + +wait_process_count(N) -> + WaitFun = fun() -> + case couch_proc_manager:get_proc_count() == N of + true -> ok; + false -> wait + end + end, + case test_util:wait(WaitFun, 5000) of + timeout -> error(timeout); + _ -> ok + end, + ?assertEqual(N, couch_proc_manager:get_proc_count()). + +cfg_set(K, V) -> + config:set("query_server_config", K, V, false), + ok. diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl index 0268b706e..16a75975a 100644 --- a/src/couch_mrview/src/couch_mrview_show.erl +++ b/src/couch_mrview/src/couch_mrview_show.erl @@ -77,7 +77,7 @@ handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId) -> JsonReq = chttpd_external:json_req_obj(Req, Db, DocId), JsonDoc = couch_query_servers:json_doc(Doc), [<<"resp">>, ExternalResp] = - couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName], + couch_query_servers:ddoc_prompt(Db, DDoc, [<<"shows">>, ShowName], [JsonDoc, JsonReq]), JsonResp = apply_etag(ExternalResp, CurrentEtag), chttpd_external:send_external_response(Req, JsonResp) @@ -122,7 +122,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> JsonReq = chttpd_external:json_req_obj(Req, Db, DocId), JsonDoc = couch_query_servers:json_doc(Doc), Cmd = [<<"updates">>, UpdateName], - UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]), + UpdateResp = couch_query_servers:ddoc_prompt(Db, DDoc, Cmd, [JsonDoc, JsonReq]), JsonResp = case UpdateResp of [<<"up">>, {NewJsonDoc}, {JsonResp0}] -> case chttpd:header_value( @@ -196,7 +196,7 @@ handle_view_list(Req, Db, DDoc, LName, VDDoc, VName, Keys) -> end, Args = Args0#mrargs{preflight_fun=ETagFun}, couch_httpd:etag_maybe(Req, fun() -> - couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) -> + couch_query_servers:with_ddoc_proc(Db, DDoc, fun(QServer) -> Acc = #lacc{db=Db, req=Req, qserver=QServer, lname=LName}, case VName of <<"_all_docs">> -> diff --git a/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl b/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl index 2182dead6..4cbbd382b 100644 --- a/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl +++ b/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl @@ -32,7 +32,7 @@ ddocid(_) -> recover(DbName) -> {ok, DDocs} = fabric:design_docs(mem3:dbname(DbName)), Funs = lists:flatmap(fun(DDoc) -> - case couch_doc:get_validate_doc_fun(DDoc) of + case couch_doc:get_validate_doc_fun(DbName, DDoc) of nil -> []; Fun -> [Fun] end |