diff options
authorNick Vatamaniuc <>2023-02-07 02:01:55 -0500
committerNick Vatamaniuc <>2023-04-15 13:37:50 -0400
commit5642c405d1c219bf19576979ba79e6de7b011d00 (patch)
parent83ffba4b3668ed83f0ee570de59dbda88b7b611a (diff)
Improve couch_proc_manager
The main improvement is speeding up process lookup. This should result in improved latency for concurrent requests which quickly acquire and release couchjs processes. Testing with concurrent vdu and map/reduce calls showed a 1.6 -> 6x performance speedup [1]. Previously, couch_proc_manager linearly searched through all the processes and executed a custom callback function for each to match design doc IDs. Instead, use a separate ets table index for idle processes to avoid scanning assigned processes. Use a db tag in addition to a ddoc id to quickly find idle processes. This could improve performance, but if that's not the case, allow configuring the tagging scheme to use a db prefix only, or disable the scheme altogether. Use the new `map_get` ets select guard [2] to perform ddoc id lookups during the ets select traversal without a custom matcher callback. In ordered ets tables use the partially bound key trick [3]. This helps skip scanning processes using a different query language altogether. Waiting clients used `os:timestamp/0` as a unique client identifier. It turns out, `os:timestamp/0` is not guaranteed to be unique and could result in some clients never getting a response. This bug was mostly likely the reason the "fifo client order" test had to be commented out. Fix the issue by using a newer monotonic timestamp function, and for uniqueness add the client's gen_server return tag at the end. Uncomment the previously commented out test so it can hopefully run again. When clients tag a previously untagged process, asynchronously replace the untagged process with a new process. This happens in the background and the client doesn't have to wait for it. When a ddoc tagged process cannot be found, before giving up, stop the oldest unused ddoc processes to allow spawning new fresh ones. To avoid doing a linear scan here, keep a separate `?IDLE_ACCESS` index with an ordered list of idle ddoc proceses sorted by their last usage time. When processes are returned to the pool, quickly respond to the client with an early return, instead of forcing them to wait until we re-insert the process back into the idle ets table. This should improve client latency. If the waiting client list gets long enough, where it waits longer than the gen_server get_proc timeout, do not waste time assigning or spawning a new process for that client, since it already timed-out. When gathering stats, avoid making gen_server calls, at least for the total number of processes spawned metric. Table sizes can be easily computed with `ets:info(Table, size)` from outside the main process. In addition to peformance improvements clean up the couch_proc_manager API by forcing all the calls to go through properly exported functions instead of doing direct gen_server calls. Remove `#proc_int{}` and use only `#proc{}`. The cast to a list/tuple between `#proc_int{}` and `#proc{}` was dangerous and it avoided the compiler checking that we're using the proper fields. Adding an extra field to the record resulted in mis-matched fields being assigned. To simplify the code a bit, keep the per-language count in an ets table. This helps not having to thread the old and updated state everywhere. Everything else was mostly kept in ets tables anyway, so we're staying consistent with that general pattern. Improve test coverage and convert the tests to use the `?TDEF_FE` macro so there is no need for the awkward `?_test(begin ... end)` construct. [1] [2] [3]
16 files changed, 1034 insertions, 496 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 93aa1ca59..9c9167962 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -382,6 +382,13 @@ authentication_db = _users
;query_limit = 268435456
;partition_query_limit = 268435456
+; Configure what to use as the db tag when selecting design doc couchjs
+; processes. The choices are:
+; - name (default) : Use the entire db name
+; - prefix : Use only db prefix before the first "/" character
+; - none : Do not use a db tag at all
+;db_tag = name
; Set to true to disable the "index all fields" text index, which can lead
; to out of memory issues when users have documents with nested array fields.
diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl
index c2c37c66d..9391fa4a2 100644
--- a/src/chttpd/src/chttpd_show.erl
+++ b/src/chttpd/src/chttpd_show.erl
@@ -79,7 +79,7 @@ handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId) ->
JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
JsonDoc = couch_query_servers:json_doc(Doc),
[<<"resp">>, ExternalResp] =
- couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName],
+ couch_query_servers:ddoc_prompt(Db, DDoc, [<<"shows">>, ShowName],
[JsonDoc, JsonReq]),
JsonResp = apply_etag(ExternalResp, CurrentEtag),
chttpd_external:send_external_response(Req, JsonResp)
@@ -124,7 +124,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) ->
JsonDoc = couch_query_servers:json_doc(Doc),
Cmd = [<<"updates">>, UpdateName],
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]),
+ UpdateResp = couch_query_servers:ddoc_prompt(Db, DDoc, Cmd, [JsonDoc, JsonReq]),
JsonResp = case UpdateResp of
[<<"up">>, {NewJsonDoc}, {JsonResp0}] ->
case chttpd:header_value(Req, "X-Couch-Full-Commit", "false") of
@@ -204,7 +204,7 @@ handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) ->
CB = fun list_cb/2,
QueryArgs = couch_mrview_http:parse_body_and_query(Req, Keys),
Options = [{user_ctx, Req#httpd.user_ctx}],
- couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) ->
+ couch_query_servers:with_ddoc_proc(Db, DDoc, fun(QServer) ->
Acc = #lacc{
lname = LName,
req = Req,
diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl
index 019c205ab..a15c65039 100644
--- a/src/couch/include/couch_db.hrl
+++ b/src/couch/include/couch_db.hrl
@@ -189,11 +189,14 @@
-record(proc, {
- client = nil,
- ddoc_keys = [],
+ client,
+ db_key,
+ ddoc_keys = #{},
- stop_fun
+ stop_fun,
+ threshold_ts,
+ last_use_ts
-record(leaf, {
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index 2078fed3a..2f0c545a6 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -233,7 +233,7 @@ filter(_Db, DocInfo, {design_docs, Style}) ->
filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
Docs = open_revs(Db, DocInfo, Style),
- {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+ {ok, Passes} = couch_query_servers:filter_view(Db, DDoc, VName, Docs),
filter_revs(Passes, Docs);
filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
Req = case Req0 of
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index fdcf23e1b..333ed3683 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -950,7 +950,7 @@ load_validation_funs(#db{main_pid=Pid}=Db) ->
DDocs = lists:map(OpenDocs, DDocInfos),
Funs = lists:flatmap(fun(DDoc) ->
- case couch_doc:get_validate_doc_fun(DDoc) of
+ case couch_doc:get_validate_doc_fun(Db, DDoc) of
nil -> [];
Fun -> [Fun]
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 535acfad6..169fbc14d 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -330,7 +330,7 @@ refresh_validate_doc_funs(Db0) ->
fun(DesignDocInfo) ->
{ok, DesignDoc} = couch_db:open_doc_int(
Db, DesignDocInfo, [ejson_body]),
- case couch_doc:get_validate_doc_fun(DesignDoc) of
+ case couch_doc:get_validate_doc_fun(Db, DesignDoc) of
nil -> [];
Fun -> [Fun]
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index ec16d21db..e9265e7a0 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -16,7 +16,7 @@
-export([from_json_obj/1, from_json_obj_validate/1]).
-export([from_json_obj/2, from_json_obj_validate/2]).
-export([to_json_obj/2, has_stubs/1, merge_stubs/2]).
--export([validate_docid/1, validate_docid/2, get_validate_doc_fun/1]).
+-export([validate_docid/1, validate_docid/2, get_validate_doc_fun/2]).
-export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
-export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
@@ -400,15 +400,15 @@ is_deleted(Tree) ->
-get_validate_doc_fun({Props}) ->
- get_validate_doc_fun(couch_doc:from_json_obj({Props}));
-get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
+get_validate_doc_fun(Db, {Props}) ->
+ get_validate_doc_fun(Db, couch_doc:from_json_obj({Props}));
+get_validate_doc_fun(Db, #doc{body={Props}}=DDoc) ->
case couch_util:get_value(<<"validate_doc_update">>, Props) of
undefined ->
_Else ->
fun(EditDoc, DiskDoc, Ctx, SecObj) ->
- couch_query_servers:validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj)
+ couch_query_servers:validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj)
diff --git a/src/couch/src/couch_native_process.erl b/src/couch/src/couch_native_process.erl
index eee8b2860..20378bf84 100644
--- a/src/couch/src/couch_native_process.erl
+++ b/src/couch/src/couch_native_process.erl
@@ -115,7 +115,7 @@ handle_cast(_Msg, State) ->
{noreply, State, State#evstate.idle}.
handle_info(timeout, State) ->
- gen_server:cast(couch_proc_manager, {os_proc_idle, self()}),
+ couch_proc_manager:os_proc_idle(self()),
{noreply, State, State#evstate.idle};
handle_info({'EXIT',_,normal}, State) ->
diff --git a/src/couch/src/couch_os_process.erl b/src/couch/src/couch_os_process.erl
index 63a241433..675f10538 100644
--- a/src/couch/src/couch_os_process.erl
+++ b/src/couch/src/couch_os_process.erl
@@ -229,7 +229,7 @@ handle_cast(Msg, #os_proc{idle=Idle}=OsProc) ->
{noreply, OsProc, Idle}.
handle_info(timeout, #os_proc{idle=Idle}=OsProc) ->
- gen_server:cast(couch_proc_manager, {os_proc_idle, self()}),
+ couch_proc_manager:os_proc_idle(self()),
{noreply, OsProc, Idle};
handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
diff --git a/src/couch/src/couch_proc_manager.erl b/src/couch/src/couch_proc_manager.erl
index e7a25a6d2..42c8958f9 100644
--- a/src/couch/src/couch_proc_manager.erl
+++ b/src/couch/src/couch_proc_manager.erl
@@ -17,6 +17,10 @@
+ get_proc/3,
+ get_proc/1,
+ ret_proc/1,
+ os_proc_idle/1,
@@ -45,11 +49,13 @@
-define(WAITERS, couch_proc_manager_waiters).
-define(OPENING, couch_proc_manager_opening).
-define(SERVERS, couch_proc_manager_servers).
+-define(COUNTERS, couch_proc_manager_counters).
+-define(IDLE_BY_DB, couch_proc_manager_idle_by_db).
+-define(IDLE_ACCESS, couch_proc_manager_idle_access).
-define(RELISTEN_DELAY, 5000).
-record(state, {
- counts,
@@ -59,31 +65,44 @@
-type revision() :: {integer(), binary()}.
-record(client, {
- timestamp :: os:timestamp() | '_',
- from :: undefined | {pid(), reference()} | '_',
+ wait_key :: {binary(), integer(), gen_server:reply_tag()} | '_',
+ from :: undefined | {pid(), gen_server:reply_tag()} | '_',
lang :: binary() | '_',
- ddoc :: #doc{} | '_',
+ ddoc :: undefined | #doc{} | '_',
+ db_key :: undefined | binary(),
ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_'
--record(proc_int, {
- pid,
- lang,
- client,
- ddoc_keys = [],
- prompt_fun,
- set_timeout_fun,
- stop_fun,
- t0 = os:timestamp()
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+get_proc(#doc{body = {Props}} = DDoc, DbKey, {_DDocId, _Rev} = DDocKey) ->
+ LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ Lang = couch_util:to_binary(LangStr),
+ Client = #client{lang = Lang, ddoc = DDoc, db_key = DbKey, ddoc_key = DDocKey},
+ Timeout = get_os_process_timeout(),
+ gen_server:call(?MODULE, {get_proc, Client}, Timeout).
+get_proc(LangStr) ->
+ Lang = couch_util:to_binary(LangStr),
+ Client = #client{lang = Lang},
+ Timeout = get_os_process_timeout(),
+ gen_server:call(?MODULE, {get_proc, Client}, Timeout).
+ret_proc(#proc{} = Proc) ->
+ gen_server:call(?MODULE, {ret_proc, Proc}, infinity).
+os_proc_idle(Proc) when is_pid(Proc) ->
+ gen_server:cast(?MODULE, {os_proc_idle, Proc}).
get_proc_count() ->
- gen_server:call(?MODULE, get_proc_count).
+ try
+ ets:info(?PROCS, size) + ets:info(?OPENING, size)
+ catch
+ error:badarg ->
+ 0
+ end.
get_stale_proc_count() ->
@@ -102,11 +121,27 @@ init([]) ->
process_flag(trap_exit, true),
ok = config:listen_for_changes(?MODULE, undefined),
- TableOpts = [public, named_table, ordered_set],
- ets:new(?PROCS, TableOpts ++ [{keypos,}]),
- ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
- ets:new(?OPENING, [public, named_table, set]),
- ets:new(?SERVERS, [public, named_table, set]),
+ % Main process table. Pid -> #proc{}
+ ets:new(?PROCS, [named_table, {read_concurrency, true}, {keypos,}]),
+ % #client{} waiters ordered by {Lang, timestamp(), Ref}
+ ets:new(?WAITERS, [named_table, ordered_set, {keypos, #client.wait_key}]),
+ % Async process openers. Pid -> #client{}
+ ets:new(?OPENING, [named_table]),
+ % Configured language servers Lang -> Start MFA | Command
+ ets:new(?SERVERS, [named_table]),
+ % Idle Pids. Ordered to allow partial key lookups {Lang, DbKey, Pid} -> DDocs
+ ets:new(?IDLE_BY_DB, [named_table, ordered_set]),
+ % Idle Db tagged pids ordered by last use. {Lang, timestamp(), Pid} -> true
+ ets:new(?IDLE_ACCESS, [named_table, ordered_set]),
+ % Lang -> number of procs spawn for that lang
+ ets:new(?COUNTERS, [named_table]),
ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")),
ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")),
ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]),
@@ -114,99 +149,82 @@ init([]) ->
{ok, #state{
config = get_proc_config(),
- counts = dict:new(),
- threshold_ts = os:timestamp(),
+ threshold_ts = timestamp(),
hard_limit = get_hard_limit(),
soft_limit = get_soft_limit()
terminate(_Reason, _State) ->
- ets:foldl(fun(#proc_int{pid=P}, _) ->
- couch_util:shutdown_sync(P)
- end, 0, ?PROCS),
- ok.
+ foreach_proc(fun(#proc{pid = P}) -> couch_util:shutdown_sync(P) end).
-handle_call(get_proc_count, _From, State) ->
- NumProcs = ets:info(?PROCS, size),
- NumOpening = ets:info(?OPENING, size),
- {reply, NumProcs + NumOpening, State};
handle_call(get_stale_proc_count, _From, State) ->
#state{threshold_ts = T0} = State,
- MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}],
+ MatchSpec = [{#proc{threshold_ts = '$1', _ = '_'}, [{'<', '$1', T0}], [true]}],
{reply, ets:select_count(?PROCS, MatchSpec), State};
+handle_call({get_proc, #client{} = Client}, From, State) ->
+ add_waiting_client(Client#client{from = From}),
+ ok = flush_waiters(State, Client#client.lang),
+ {noreply, State};
+handle_call({ret_proc, #proc{} = Proc}, From, State) ->
+ #proc{client = Ref, pid = Pid} = Proc,
-handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
- LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
- Lang = couch_util:to_binary(LangStr),
- Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
- add_waiting_client(Client),
- {noreply, flush_waiters(State, Lang)};
-handle_call({get_proc, LangStr}, From, State) ->
- Lang = couch_util:to_binary(LangStr),
- Client = #client{from=From, lang=Lang},
- add_waiting_client(Client),
- {noreply, flush_waiters(State, Lang)};
-handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
erlang:demonitor(Ref, [flush]),
- NewState = case ets:lookup(?PROCS, of
- [#proc_int{}=ProcInt] ->
- return_proc(State, ProcInt);
+ gen_server:reply(From, true),
+ case ets:lookup(?PROCS, Pid) of
+ [#proc{} = ProcInt] ->
+ ok = return_proc(State, ProcInt);
[] ->
% Proc must've died and we already
% cleared it out of the table in
% the handle_info clause.
- State
+ ok
- {reply, true, NewState};
+ {noreply, State};
handle_call(set_threshold_ts, _From, State) ->
- FoldFun = fun
- (#proc_int{client = undefined} = Proc, StateAcc) ->
- remove_proc(StateAcc, Proc);
- (_, StateAcc) ->
- StateAcc
+ Fun = fun
+ (#proc{client = undefined} = Proc) -> ok = remove_proc(Proc);
+ (_) -> ok
- NewState = ets:foldl(FoldFun, State, ?PROCS),
- {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
+ ok = foreach_proc(Fun),
+ {reply, ok, State#state{threshold_ts = timestamp()}};
handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
- FoldFun = fun
- (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+ Fun = fun
+ (#proc{client = undefined, threshold_ts = Ts2} = Proc) ->
case Ts1 > Ts2 of
- true ->
- remove_proc(StateAcc, Proc);
- false ->
- StateAcc
+ true -> ok = remove_proc(Proc);
+ false -> ok
- (_, StateAcc) ->
- StateAcc
+ (_) ->
+ ok
- NewState = ets:foldl(FoldFun, State, ?PROCS),
- {reply, ok, NewState};
+ foreach_proc(Fun),
+ {reply, ok, State};
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
-handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
- NewState = case ets:lookup(?PROCS, Pid) of
- [#proc_int{client=undefined, lang=Lang}=Proc] ->
- case dict:find(Lang, Counts) of
- {ok, Count} when Count >= State#state.soft_limit ->
- couch_log:info("Closing idle OS Process: ~p", [Pid]),
- remove_proc(State, Proc);
- {ok, _} ->
- State
+handle_cast({os_proc_idle, Pid}, #state{soft_limit = SoftLimit} = State) ->
+ case ets:lookup(?PROCS, Pid) of
+ [#proc{client = undefined, db_key = DbKey, lang = Lang} = Proc] ->
+ IsOverSoftLimit = get_count(Lang) >= SoftLimit,
+ IsTagged = DbKey =/= undefined,
+ case IsOverSoftLimit orelse IsTagged of
+ true ->
+ couch_log:debug("Closing tagged or idle OS Process: ~p", [Pid]),
+ ok = remove_proc(Proc);
+ false ->
+ ok
_ ->
- {noreply, NewState};
+ {noreply, State};
handle_cast(reload_config, State) ->
NewState = State#state{
@@ -215,47 +233,59 @@ handle_cast(reload_config, State) ->
soft_limit = get_soft_limit()
- {noreply, flush_waiters(NewState)};
+ lists:foreach(
+ fun({Lang, _}) ->
+ ok = flush_waiters(NewState, Lang)
+ end,
+ ets:tab2list(?COUNTERS)
+ ),
+ {noreply, NewState};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(shutdown, State) ->
{stop, shutdown, State};
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, undefined = _From}}, State) ->
+ % Use ets:take/2 to assert that opener existed before removing. Also assert that
+ % the pid matches and the the client was a bogus client
+ [{Pid, #client{from = undefined}}] = ets:take(?OPENING, Pid),
+ Proc = Proc0#proc{client = undefined},
+ link(,
+ ets:insert(?PROCS, Proc),
+ insert_idle_by_db(Proc),
+ {noreply, State};
handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
- ets:delete(?OPENING, Pid),
- link(,
+ % Use ets:take/2 to assert that opener existed before removing
+ [{Pid, #client{}}] = ets:take(?OPENING, Pid),
+ link(,
Proc = assign_proc(ClientPid, Proc0),
gen_server:reply(From, {ok, Proc, State#state.config}),
{noreply, State};
handle_info({'EXIT', Pid, spawn_error}, State) ->
- [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
- ets:delete(?OPENING, Pid),
- NewState = State#state{
- counts = dict:update_counter(Lang, -1, State#state.counts)
- },
- {noreply, flush_waiters(NewState, Lang)};
+ % Assert when removing that we always expect the opener to have been there
+ [{Pid, #client{lang = Lang}}] = ets:take(?OPENING, Pid),
+ dec_count(Lang),
+ ok = flush_waiters(State, Lang),
+ {noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]),
case ets:lookup(?PROCS, Pid) of
- [#proc_int{} = Proc] ->
- NewState = remove_proc(State, Proc),
- {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
- [] ->
- {noreply, State}
- end;
-handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
- case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
- [#proc_int{} = Proc] ->
- {noreply, return_proc(State0, Proc)};
+ [#proc{} = Proc] ->
+ ok = remove_proc(Proc),
+ ok = flush_waiters(State, Proc#proc.lang);
[] ->
- {noreply, State0}
- end;
+ ok
+ end,
+ {noreply, State};
+handle_info({'DOWN', Ref, _, _, _Reason}, #state{} = State) ->
+ case ets:match_object(?PROCS, #proc{client = Ref, _ = '_'}) of
+ [#proc{} = Proc] -> ok = return_proc(State, Proc);
+ [] -> ok
+ end,
+ {noreply, State};
handle_info(restart_config_listener, State) ->
@@ -284,73 +314,97 @@ handle_config_change("query_server_config", _, _, _, _) ->
handle_config_change(_, _, _, _, _) ->
{ok, undefined}.
-find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
- Pred = fun(_) ->
- true
- end,
- find_proc(Lang, Pred);
-find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
- Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
- lists:member(DDocKey, DDocKeys)
- end,
- case find_proc(Lang, Pred) of
+find_proc(#client{ddoc_key = undefined} = Client, _CanSpawn) ->
+ #client{lang = Lang} = Client,
+ % Find an unowned process first, if that fails find an owned one
+ case find_proc(Lang, undefined, '_') of
+ {ok, Proc} ->
+ {ok, Proc};
not_found ->
- case find_proc(Client#client{ddoc_key=undefined}) of
+ case find_proc(Lang, '_', '_') of
{ok, Proc} ->
- teach_ddoc(DDoc, DDocKey, Proc);
+ {ok, Proc};
Else ->
Else ->
+ end;
+find_proc(#client{} = Client, CanSpawn) ->
+ #client{
+ lang = Lang,
+ ddoc = DDoc,
+ db_key = DbKey,
+ ddoc_key = DDocKey
+ } = Client,
+ case find_proc(Lang, DbKey, DDocKey) of
+ not_found ->
+ % Find a ddoc process used by the same db at least
+ case find_proc(Lang, DbKey, '_') of
+ {ok, Proc} ->
+ teach_ddoc(DDoc, DbKey, DDocKey, Proc);
+ not_found ->
+ % Pick a process not used by any ddoc
+ case find_proc(Lang, undefined, '_') of
+ {ok, Proc} ->
+ replenish_untagged_pool(Lang, CanSpawn),
+ teach_ddoc(DDoc, DbKey, DDocKey, Proc);
+ Else ->
+ Else
+ end;
+ Else ->
+ Else
+ end;
+ {ok, Proc} ->
+ {ok, Proc};
+ Else ->
+ Else
-find_proc(Lang, Fun) ->
- try iter_procs(Lang, Fun)
- catch ?STACKTRACE(error, Reason, StackTrace)
- couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
- {error, Reason}
- end.
-iter_procs(Lang, Fun) when is_binary(Lang) ->
- Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
- MSpec = [{Pattern, [], ['$_']}],
- case ets:select_reverse(?PROCS, MSpec, 25) of
- '$end_of_table' ->
- not_found;
- Continuation ->
- iter_procs_int(Continuation, Fun)
- end.
+find_proc(Lang, DbPat, DDocKey) when
+ DbPat =:= '_' orelse DbPat =:= undefined orelse is_binary(DbPat),
+ DDocKey =:= '_' orelse is_tuple(DDocKey)
+ Pattern = {{Lang, DbPat, '$1'}, '$2'},
+ Guards =
+ case DDocKey of
+ '_' -> [];
+ {_, _} -> [{map_get, {const, DDocKey}, '$2'}]
+ end,
+ MSpec = [{Pattern, Guards, ['$1']}],
+ case ets:select_reverse(?IDLE_BY_DB, MSpec, 1) of
-iter_procs_int({[], Continuation0}, Fun) ->
- case ets:select_reverse(Continuation0) of
'$end_of_table' ->
- Continuation1 ->
- iter_procs_int(Continuation1, Fun)
- end;
-iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
- case Fun(Proc) of
- true ->
- {ok, Proc};
- false ->
- iter_procs_int({Rest, Continuation}, Fun)
+ {[Pid], _Continuation} when is_pid(Pid) ->
+ [#proc{client = undefined} = Proc] = ets:lookup(?PROCS, Pid),
+ % Once it's found it's not idle any longer and it might be
+ % "tought" a new ddoc, so its db_key might change
+ remove_idle_by_db(Proc),
+ remove_idle_access(Proc),
+ {ok, Proc}
-spawn_proc(State, Client) ->
+spawn_proc(#client{} = Client) ->
Pid = spawn_link(?MODULE, new_proc, [Client]),
ets:insert(?OPENING, {Pid, Client}),
- Counts = State#state.counts,
- Lang = Client#client.lang,
- State#state{
- counts = dict:update_counter(Lang, 1, Counts)
- }.
+ inc_count(Client#client.lang).
+% This instance was spawned without a client to replenish
+% the untagged pool asynchronously
+new_proc(#client{from = undefined} = Client) ->
+ #client{lang = Lang} = Client,
+ Resp = try
+ case new_proc_int(undefined, Lang) of
+ {ok, Proc} ->
+ {spawn_ok, Proc, undefined};
+ _Error ->
+ spawn_error
+ end
+ catch _:_ ->
+ spawn_error
+ end,
+ exit(Resp);
new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
#client{from=From, lang=Lang} = Client,
Resp = try
@@ -365,13 +419,17 @@ new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
-new_proc(Client) ->
- #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
- Resp = try
- case new_proc_int(From, Lang) of
+new_proc(#client{} = Client) ->
+ #client{
+ from = From,
+ lang = Lang,
+ ddoc = DDoc,
+ db_key = DbKey,
+ ddoc_key = DDocKey
+ } = Client,
+ Resp = try case new_proc_int(From, Lang) of
{ok, NewProc} ->
- {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc),
+ {ok, Proc} = teach_ddoc(DDoc, DbKey, DDocKey, NewProc),
{spawn_ok, Proc, From};
Error ->
gen_server:reply(From, {error, Error}),
@@ -382,6 +440,64 @@ new_proc(Client) ->
+replenish_untagged_pool(Lang, _CanSpawn = true) ->
+ % After an untagged instance is tagged, we try to replenish
+ % the untagged pool asynchronously. Here we are using a "bogus"
+ % #client{} with an undefined from field.
+ ok = spawn_proc(#client{lang = Lang, from = undefined});
+replenish_untagged_pool(_Lang, _CanSpawn = false) ->
+ ok.
+reap_idle(Num, <<_/binary>> = Lang) when is_integer(Num), Num >= 1 ->
+ case ets:match_object(?IDLE_ACCESS, {{Lang, '_', '_'}, '_'}, Num) of
+ '$end_of_table' ->
+ 0;
+ {Objects = [_ | _], _} ->
+ ok = reap_idle(Objects),
+ length(Objects)
+ end.
+reap_idle([]) ->
+ ok;
+reap_idle([{{_Lang, _Ts, Pid}, true} | Rest]) ->
+ case ets:lookup(?PROCS, Pid) of
+ % Do an extra assert that client is undefined
+ [#proc{client = undefined} = Proc] ->
+ ok = remove_proc(Proc);
+ [] ->
+ ok
+ end,
+ reap_idle(Rest).
+insert_idle_access(#proc{db_key = undefined}, _Ts) ->
+ % Only tagged proc are index in ?IDLE_ACCESS
+ ok;
+insert_idle_access(#proc{db_key = <<_/binary>>} = Proc, Ts) ->
+ #proc{lang = Lang, pid = Pid} = Proc,
+ % Lang is used for partially bound key access
+ % Pid is for uniqueness as time is not strictly monotonic
+ true = ets:insert_new(?IDLE_ACCESS, {{Lang, Ts, Pid}, true}),
+ ok.
+remove_idle_access(#proc{db_key = undefined}) ->
+ % Only tagged procs are indexed in ?IDLE_ACCESS
+ ok;
+remove_idle_access(#proc{db_key = <<_/binary>>} = Proc) ->
+ #proc{last_use_ts = Ts, lang = Lang, pid = Pid} = Proc,
+ true = ets:delete(?IDLE_ACCESS, {Lang, Ts, Pid}),
+ ok.
+insert_idle_by_db(#proc{} = Proc) ->
+ #proc{lang = Lang, pid = Pid, db_key = Db, ddoc_keys = #{} = DDocs} = Proc,
+ % An extra assert that only expect to insert a new object
+ true = ets:insert_new(?IDLE_BY_DB, {{Lang, Db, Pid}, DDocs}),
+ ok.
+remove_idle_by_db(#proc{} = Proc) ->
+ #proc{lang = Lang, pid = Pid, db_key = Db} = Proc,
+ true = ets:delete(?IDLE_BY_DB, {Lang, Db, Pid}),
+ ok.
split_string_if_longer(String, Pos) ->
case length(String) > Pos of
true -> lists:split(Pos, String);
@@ -435,7 +551,10 @@ new_proc_int(From, Lang) when is_binary(Lang) ->
LangStr = binary_to_list(Lang),
case get_query_server(LangStr) of
undefined ->
- gen_server:reply(From, {unknown_query_language, Lang});
+ case From of
+ undefined -> ok;
+ {_, _} -> gen_server:reply(From, {unknown_query_language, Lang})
+ end;
{M, F, A} ->
{ok, Pid} = apply(M, F, A),
make_proc(Pid, Lang, M);
@@ -444,102 +563,98 @@ new_proc_int(From, Lang) when is_binary(Lang) ->
make_proc(Pid, Lang, couch_os_process)
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
+teach_ddoc(DDoc, DbKey, {DDocId, _Rev} = DDocKey, #proc{ddoc_keys = #{} = Keys} = Proc) ->
% send ddoc over the wire
% we only share the rev with the client we know to update code
% but it only keeps the latest copy, per each ddoc, around.
- true = couch_query_servers:proc_prompt(
- export_proc(Proc),
- [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ JsonDoc = couch_doc:to_json_obj(DDoc, []),
+ Prompt = [<<"ddoc">>, <<"new">>, DDocId, JsonDoc],
+ true = couch_query_servers:proc_prompt(Proc, Prompt),
% we should remove any other ddocs keys for this docid
% because the query server overwrites without the rev
- Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ Keys2 = maps:filter(fun({Id, _}, true) -> Id =/= DDocId end, Keys),
% add ddoc to the proc
- {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
+ {ok, Proc#proc{db_key = DbKey, ddoc_keys = Keys2#{DDocKey => true}}}.
make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
- Proc = #proc_int{
+ Proc = #proc{
lang = Lang,
pid = Pid,
prompt_fun = {Mod, prompt},
set_timeout_fun = {Mod, set_timeout},
- stop_fun = {Mod, stop}
+ stop_fun = {Mod, stop},
+ threshold_ts = timestamp(),
+ last_use_ts = timestamp()
{ok, Proc}.
-assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
- Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+assign_proc(Pid, #proc{client = undefined} = Proc0) when is_pid(Pid) ->
+ Proc = Proc0#proc{client = erlang:monitor(process, Pid)},
+ % It's important to insert the proc here instead of doing an update_element
+ % as we might have updated the db_key or ddoc_keys in teach_ddoc/4
ets:insert(?PROCS, Proc),
- export_proc(Proc);
-assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
+ Proc;
+assign_proc(#client{} = Client, #proc{client = undefined} = Proc) ->
{Pid, _} = Client#client.from,
assign_proc(Pid, Proc).
-return_proc(#state{} = State, #proc_int{} = ProcInt) ->
- #proc_int{pid = Pid, lang = Lang} = ProcInt,
- NewState = case is_process_alive(Pid) of true ->
- case ProcInt#proc_int.t0 < State#state.threshold_ts of
- true ->
- remove_proc(State, ProcInt);
- false ->
- gen_server:cast(Pid, garbage_collect),
- true = ets:update_element(?PROCS, Pid, [
- {#proc_int.client, undefined}
- ]),
- State
- end;
- false ->
- remove_proc(State, ProcInt)
+return_proc(#state{} = State, #proc{} = Proc) ->
+ #proc{pid = Pid, lang = Lang} = Proc,
+ case is_process_alive(Pid) of
+ true ->
+ case Proc#proc.threshold_ts < State#state.threshold_ts of
+ true ->
+ ok = remove_proc(Proc);
+ false ->
+ gen_server:cast(Pid, garbage_collect),
+ Ts = timestamp(),
+ true = ets:update_element(?PROCS, Pid, [
+ {#proc.client, undefined},
+ {#proc.last_use_ts, Ts}
+ ]),
+ Proc1 = Proc#proc{client = undefined, last_use_ts = Ts},
+ insert_idle_access(Proc1, Ts),
+ insert_idle_by_db(Proc1)
+ end;
+ false ->
+ ok = remove_proc(Proc)
- flush_waiters(NewState, Lang).
-remove_proc(State, #proc_int{}=Proc) ->
- ets:delete(?PROCS,,
- case is_process_alive( of true ->
- unlink(,
- gen_server:cast(, stop);
- false ->
- ok
+ ok = flush_waiters(State, Lang).
+remove_proc(#proc{pid = Pid} = Proc) ->
+ remove_idle_access(Proc),
+ remove_idle_by_db(Proc),
+ ets:delete(?PROCS, Pid),
+ case is_process_alive(Pid) of
+ true->
+ unlink(Pid),
+ gen_server:cast(Pid, stop);
+ false ->
+ ok
- Counts = State#state.counts,
- Lang = Proc#proc_int.lang,
- State#state{
- counts = dict:update_counter(Lang, -1, Counts)
- }.
--spec export_proc(#proc_int{}) -> #proc{}.
-export_proc(#proc_int{} = ProcInt) ->
- ProcIntList = tuple_to_list(ProcInt),
- ProcLen = record_info(size, proc),
- [_ | Data] = lists:sublist(ProcIntList, ProcLen),
- list_to_tuple([proc | Data]).
-flush_waiters(State) ->
- dict:fold(fun(Lang, Count, StateAcc) ->
- case Count < State#state.hard_limit of
- true ->
- flush_waiters(StateAcc, Lang);
- false ->
- StateAcc
- end
- end, State, State#state.counts).
+ dec_count(Proc#proc.lang).
-flush_waiters(State, Lang) ->
- CanSpawn = can_spawn(State, Lang),
+flush_waiters(#state{} = State, Lang) ->
+ #state{hard_limit = HardLimit, config = {[_ | _] = Cfg}} = State,
+ TimeoutMSec = couch_util:get_value(<<"timeout">>, Cfg),
+ Timeout = erlang:convert_time_unit(TimeoutMSec, millisecond, native),
+ StaleLimit = timestamp() - Timeout,
case get_waiting_client(Lang) of
+ #client{wait_key = {_, T, _}} = Client when is_integer(T), T < StaleLimit ->
+ % Client waited too long and the gen_server call timeout
+ % likey fired already, don't bother allocating a process for it
+ remove_waiting_client(Client),
+ flush_waiters(State, Lang);
#client{from = From} = Client ->
- case find_proc(Client) of
- {ok, ProcInt} ->
- Proc = assign_proc(Client, ProcInt),
+ CanSpawn = get_count(Lang) < HardLimit,
+ case find_proc(Client, CanSpawn) of
+ {ok, Proc0} ->
+ Proc = assign_proc(Client, Proc0),
gen_server:reply(From, {ok, Proc, State#state.config}),
flush_waiters(State, Lang);
@@ -548,44 +663,56 @@ flush_waiters(State, Lang) ->
flush_waiters(State, Lang);
not_found when CanSpawn ->
- NewState = spawn_proc(State, Client),
+ ok = spawn_proc(Client),
- flush_waiters(NewState, Lang);
+ flush_waiters(State, Lang);
not_found ->
- State
+ % 10% of limit
+ ReapBatch = round(HardLimit * 0.1 + 1),
+ case reap_idle(ReapBatch, Lang) of
+ N when is_integer(N), N > 0 ->
+ % We may have room available to spawn
+ case get_count(Lang) < HardLimit of
+ true ->
+ ok = spawn_proc(Client),
+ remove_waiting_client(Client),
+ flush_waiters(State, Lang);
+ false ->
+ ok
+ end;
+ 0 ->
+ ok
+ end
undefined ->
- State
+ ok
-add_waiting_client(Client) ->
- ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+add_waiting_client(#client{from = {_Pid, Tag}, lang = Lang} = Client) ->
+ % Use Lang in the key first since we can look it up using a partially bound
+ % in get_waiting_client/2. Use the reply tag to provide uniqueness.
+ Key = {Lang, timestamp(), Tag},
+ true = ets:insert_new(?WAITERS, Client#client{wait_key = Key}).
-spec get_waiting_client(Lang :: binary()) -> undefined | #client{}.
get_waiting_client(Lang) ->
- case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
+ % Use a partially bound key (Lang) to avoid scanning unrelated procs
+ Key = {Lang, '_', '_'},
+ case ets:match_object(?WAITERS, #client{wait_key = Key, _ = '_'}, 1) of
'$end_of_table' ->
{[#client{}=Client], _} ->
+remove_waiting_client(#client{wait_key = Key}) ->
+ ets:delete(?WAITERS, Key).
-remove_waiting_client(#client{timestamp = Timestamp}) ->
- ets:delete(?WAITERS, Timestamp).
-can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
- case dict:find(Lang, Counts) of
- {ok, Count} -> Count < HardLimit;
- error -> true
- end.
get_proc_config() ->
Limit = config:get_boolean("query_server_config", "reduce_limit", true),
- Timeout = config:get_integer("couchdb", "os_process_timeout", 5000),
+ Timeout = get_os_process_timeout(),
{<<"reduce_limit">>, Limit},
{<<"timeout">>, Timeout}
@@ -593,9 +720,37 @@ get_proc_config() ->
get_hard_limit() ->
- LimStr = config:get("query_server_config", "os_process_limit", "100"),
- list_to_integer(LimStr).
+ config:get_integer("query_server_config", "os_process_limit", 100).
get_soft_limit() ->
config:get_integer("query_server_config", "os_process_soft_limit", 100).
+get_os_process_timeout() ->
+ config:get_integer("couchdb", "os_process_timeout", 5000).
+timestamp() ->
+ erlang:monotonic_time().
+foreach_proc(Fun) when is_function(Fun, 1) ->
+ FoldFun = fun(#proc{} = Proc, ok) ->
+ Fun(Proc),
+ ok
+ end,
+ ok = ets:foldl(FoldFun, ok, ?PROCS).
+inc_count(Lang) ->
+ ets:update_counter(?COUNTERS, Lang, 1, {Lang, 0}),
+ ok.
+dec_count(Lang) ->
+ ets:update_counter(?COUNTERS, Lang, -1, {Lang, 0}),
+ ok.
+get_count(Lang) ->
+ case ets:lookup(?COUNTERS, Lang) of
+ [{_, Count}] when is_integer(Count) ->
+ Count;
+ [] ->
+ 0
+ end.
diff --git a/src/couch/src/couch_query_servers.erl b/src/couch/src/couch_query_servers.erl
index 10b8048dd..43a1b7fa6 100644
--- a/src/couch/src/couch_query_servers.erl
+++ b/src/couch/src/couch_query_servers.erl
@@ -14,16 +14,16 @@
-export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
--export([reduce/3, rereduce/3,validate_doc_update/5]).
+-export([reduce/3, rereduce/3, validate_doc_update/6]).
--export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+-export([with_ddoc_proc/3, proc_prompt/2, ddoc_prompt/4, ddoc_proc_prompt/3, json_doc/1]).
% For 210-os-proc-pool.t
--export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]).
+-export([get_os_process/1, get_ddoc_process/3, ret_os_process/1]).
@@ -355,17 +355,19 @@ approx_count_distinct(rereduce, Reds) ->
hyper:union([Filter || [_, Filter] <- Reds]).
% use the function stored in ddoc.validate_doc_update to test an update.
--spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+-spec validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+ Db :: term(),
DDoc :: ddoc(),
EditDoc :: doc(),
DiskDoc :: doc() | nil,
Ctx :: user_ctx(),
SecObj :: sec_obj().
-validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
+validate_doc_update(Db, DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
JsonDiskDoc = json_doc(DiskDoc),
Resp = ddoc_prompt(
+ Db,
[JsonEditDoc, JsonDiskDoc, Ctx, SecObj]
@@ -392,7 +394,7 @@ rewrite(Req, Db, DDoc) ->
F =/= <<"info">>, F =/= <<"form">>,
F =/= <<"uuid">>, F =/= <<"id">>],
JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields),
- case couch_query_servers:ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of
+ case ddoc_prompt(Db, DDoc, [<<"rewrites">>], [JsonReq]) of
{[{<<"forbidden">>, Message}]} ->
throw({forbidden, Message});
{[{<<"unauthorized">>, Message}]} ->
@@ -480,10 +482,10 @@ json_doc(nil, _) ->
json_doc(Doc, Options) ->
couch_doc:to_json_obj(Doc, Options).
-filter_view(DDoc, VName, Docs) ->
+filter_view(Db, DDoc, VName, Docs) ->
Options = json_doc_options(),
JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
- [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
+ [true, Passes] = ddoc_prompt(Db, DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
{ok, Passes}.
filter_docs(Req, Db, DDoc, FName, Docs) ->
@@ -496,33 +498,34 @@ filter_docs(Req, Db, DDoc, FName, Docs) ->
Options = json_doc_options(),
JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
- {ok, filter_docs_int(DDoc, FName, JsonReq, JsonDocs)}
+ {ok, filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs)}
throw:{os_process_error,{exit_status,1}} ->
%% batch used too much memory, retry sequentially.
Fun = fun(JsonDoc) ->
- filter_docs_int(DDoc, FName, JsonReq, [JsonDoc])
+ filter_docs_int(Db, DDoc, FName, JsonReq, [JsonDoc])
{ok, lists:flatmap(Fun, JsonDocs)}
-filter_docs_int(DDoc, FName, JsonReq, JsonDocs) ->
- [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
+filter_docs_int(Db, DDoc, FName, JsonReq, JsonDocs) ->
+ [true, Passes] = ddoc_prompt(Db, DDoc, [<<"filters">>, FName],
[JsonDocs, JsonReq]),
ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
-ddoc_prompt(DDoc, FunPath, Args) ->
- with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+ddoc_prompt(Db, DDoc, FunPath, Args) ->
+ with_ddoc_proc(Db, DDoc, fun({Proc, DDocId}) ->
proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
-with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+with_ddoc_proc(Db, #doc{id = DDocId, revs = {Start, [DiskRev | _]}} = DDoc, Fun) ->
Rev = couch_doc:rev_to_str({Start, DiskRev}),
+ DbKey = db_key(Db),
DDocKey = {DDocId, Rev},
- Proc = get_ddoc_process(DDoc, DDocKey),
+ Proc = get_ddoc_process(DDoc, DbKey, DDocKey),
try Fun({Proc, DDocId}) of
Resp ->
ok = ret_os_process(Proc),
@@ -532,6 +535,22 @@ with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
erlang:raise(Tag, Err, Stack)
+db_key(DbName) when is_binary(DbName) ->
+ Name = mem3:dbname(DbName),
+ case config:get("query_server_config", "db_tag", "name") of
+ "prefix" ->
+ case binary:split(Name, <<"/">>) of
+ [Prefix, _] when byte_size(Prefix) > 0 -> Prefix;
+ _ -> Name
+ end;
+ "none" ->
+ undefined;
+ _ ->
+ Name
+ end;
+db_key(Db) ->
+ db_key(couch_db:name(Db)).
proc_prompt(Proc, Args) ->
case proc_prompt_raw(Proc, Args) of
{json, Json} ->
@@ -622,12 +641,9 @@ proc_set_timeout(Proc, Timeout) ->
{Mod, Func} = Proc#proc.set_timeout_fun,
apply(Mod, Func, [, Timeout]).
-get_os_process_timeout() ->
- config:get_integer("couchdb", "os_process_timeout", 5000).
-get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+get_ddoc_process(#doc{} = DDoc, DbKey, DDocKey) ->
% remove this case statement
- case gen_server:call(couch_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of
+ case couch_proc_manager:get_proc(DDoc, DbKey, DDocKey) of
{ok, Proc, {QueryConfig}} ->
% process knows the ddoc
case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
@@ -636,14 +652,14 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) ->
_ ->
catch proc_stop(Proc),
- get_ddoc_process(DDoc, DDocKey)
+ get_ddoc_process(DDoc, DbKey, DDocKey)
Error ->
get_os_process(Lang) ->
- case gen_server:call(couch_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of
+ case couch_proc_manager:get_proc(Lang) of
{ok, Proc, {QueryConfig}} ->
case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
true ->
@@ -658,7 +674,7 @@ get_os_process(Lang) ->
ret_os_process(Proc) ->
- true = gen_server:call(couch_proc_manager, {ret_proc, Proc}, infinity),
+ true = couch_proc_manager:ret_proc(Proc),
catch unlink(,
@@ -670,7 +686,10 @@ throw_stat_error(Else) ->
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
builtin_sum_rows_negative_test() ->
A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
@@ -799,4 +818,42 @@ force_utf8_test() ->
end, NotOk).
+db_key_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_db_key_default),
+ ?TDEF_FE(t_db_key_prefix),
+ ?TDEF_FE(t_db_key_none)
+ ]
+ }.
+setup() ->
+ meck:new(config, [passthrough]),
+ meck:expect(config, get, fun(_, _, Default) -> Default end),
+ ok.
+teardown(_) ->
+ meck:unload().
+t_db_key_default(_) ->
+ ?assertEqual(<<"foo">>, db_key(<<"foo">>)),
+ ?assertEqual(<<"foo/bar">>, db_key(<<"foo/bar">>)),
+ ?assertEqual(<<"foo/bar">>, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)).
+t_db_key_prefix(_) ->
+ meck:expect(config, get, fun(_, "db_tag", _) -> "prefix" end),
+ ?assertEqual(<<"foo">>, db_key(<<"foo">>)),
+ ?assertEqual(<<"foo">>, db_key(<<"foo/bar">>)),
+ ?assertEqual(<<"foo">>, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)),
+ ?assertEqual(<<"/foo">>, db_key(<<"/foo">>)).
+t_db_key_none(_) ->
+ meck:expect(config, get, fun(_, "db_tag", _) -> "none" end),
+ ?assertEqual(undefined, db_key(<<"foo">>)),
+ ?assertEqual(undefined, db_key(<<"foo/bar">>)),
+ ?assertEqual(undefined, db_key(<<"shards/00000000-3fffffff/foo/bar.1415960794">>)).
diff --git a/src/couch/test/eunit/couch_auth_cache_tests.erl b/src/couch/test/eunit/couch_auth_cache_tests.erl
index 71faf77d6..ceed68148 100644
--- a/src/couch/test/eunit/couch_auth_cache_tests.erl
+++ b/src/couch/test/eunit/couch_auth_cache_tests.erl
@@ -345,5 +345,6 @@ validate(DiskDoc, NewDoc) ->
validate(DiskDoc, NewDoc, JSONCtx) ->
{ok, DDoc0} = couch_auth_cache:auth_design_doc(<<"_design/anything">>),
+ Db = <<"validate_couch_auth_cache_tests">>,
DDoc = DDoc0#doc{revs = {1, [<<>>]}},
- couch_query_servers:validate_doc_update(DDoc, NewDoc, DiskDoc, JSONCtx, []).
+ couch_query_servers:validate_doc_update(Db, DDoc, NewDoc, DiskDoc, JSONCtx, []).
diff --git a/src/couch/test/eunit/couch_query_servers_tests.erl b/src/couch/test/eunit/couch_query_servers_tests.erl
index 440fc8e1b..d142498d6 100644
--- a/src/couch/test/eunit/couch_query_servers_tests.erl
+++ b/src/couch/test/eunit/couch_query_servers_tests.erl
@@ -110,7 +110,7 @@ should_return_object_on_false() ->
should_split_large_batches() ->
Req = {json_req, {[]}},
- Db = undefined,
+ Db = <<"somedb">>,
DDoc = #doc{
id = <<"_design/foo">>,
revs = {0, [<<"bork bork bork">>]},
diff --git a/src/couch/test/eunit/couchdb_os_proc_pool.erl b/src/couch/test/eunit/couchdb_os_proc_pool.erl
index b552e114a..645385376 100644
--- a/src/couch/test/eunit/couchdb_os_proc_pool.erl
+++ b/src/couch/test/eunit/couchdb_os_proc_pool.erl
@@ -15,240 +15,492 @@
+-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end).
-define(TIMEOUT, 1000).
setup() ->
- ok = couch_proc_manager:reload(),
+ Ctx = test_util:start_couch(),
meck:new(couch_os_process, [passthrough]),
- ok = setup_config().
+ meck:new(couch_proc_manager, [passthrough]),
+ ok = setup_config(),
+ Ctx.
-teardown(_) ->
+teardown(Ctx) ->
+ ok = teardown_config(),
+ test_util:stop_couch(Ctx),
os_proc_pool_test_() ->
"OS processes pool tests",
- setup,
- fun test_util:start_couch/0, fun test_util:stop_couch/1,
- {
- foreach,
- fun setup/0, fun teardown/1,
- [
- should_block_new_proc_on_full_pool(),
- should_free_slot_on_proc_unexpected_exit(),
- should_reuse_known_proc(),
-% should_process_waiting_queue_as_fifo(),
- should_reduce_pool_on_idle_os_procs(),
- should_not_return_broken_process_to_the_pool()
- ]
- }
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_block_new_proc_on_full_pool),
+ ?TDEF_FE(should_free_slot_on_proc_unexpected_exit),
+ ?TDEF_FE(should_reuse_known_proc),
+ ?TDEF_FE(should_process_waiting_queue_as_fifo),
+ ?TDEF_FE(should_reduce_pool_on_idle_os_procs),
+ ?TDEF_FE(should_reduce_pool_of_tagged_processes_on_idle),
+ ?TDEF_FE(should_not_return_broken_process_to_the_pool),
+ ?TDEF_FE(oldest_tagged_process_is_reaped),
+ ?TDEF_FE(untagged_process_is_replenished),
+ ?TDEF_FE(exact_ddoc_tagged_process_is_picked_first),
+ ?TDEF_FE(db_tagged_process_is_second_choice),
+ ?TDEF_FE(if_no_tagged_process_found_new_must_be_spawned),
+ ?TDEF_FE(db_tag_none_works),
+ ?TDEF_FE(stale_procs_are_cleaned),
+ ?TDEF_FE(bad_query_language)
+ ]
+should_block_new_proc_on_full_pool(_) ->
+ Client1 = spawn_client(),
+ Client2 = spawn_client(),
+ Client3 = spawn_client(),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ Proc3 = get_client_proc(Client3, "3"),
+ ?assertNotEqual(Proc1, Proc2),
+ ?assertNotEqual(Proc2, Proc3),
+ ?assertNotEqual(Proc3, Proc1),
+ Client4 = spawn_client(),
+ ?assertEqual(timeout, ping_client(Client4)),
+ ?assertEqual(ok, stop_client(Client1)),
+ ?assertEqual(ok, ping_client(Client4)),
+ Proc4 = get_client_proc(Client4, "4"),
+ ?assertEqual(,,
+ ?assertNotEqual(Proc1#proc.client, Proc4#proc.client),
+ stop_clients([Client2, Client3, Client4]).
+should_free_slot_on_proc_unexpected_exit(_) ->
+ Client1 = spawn_client(),
+ Client2 = spawn_client(),
+ Client3 = spawn_client(),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ Proc3 = get_client_proc(Client3, "3"),
+ ?assertNotEqual(,,
+ ?assertNotEqual(Proc1#proc.client, Proc2#proc.client),
+ ?assertNotEqual(,,
+ ?assertNotEqual(Proc2#proc.client, Proc3#proc.client),
+ ?assertNotEqual(,,
+ ?assertNotEqual(Proc3#proc.client, Proc1#proc.client),
+ ?assertEqual(ok, kill_client(Client1)),
+ Client4 = spawn_client(),
+ ?assertEqual(ok, ping_client(Client4)),
+ Proc4 = get_client_proc(Client4, "4"),
+ ?assertEqual(,,
+ ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
+ ?assertNotEqual(,,
+ ?assertNotEqual(Proc2#proc.client, Proc4#proc.client),
+ ?assertNotEqual(,,
+ ?assertNotEqual(Proc3#proc.client, Proc4#proc.client),
+ stop_clients([Client2, Client3, Client4]).
+should_reuse_known_proc(_) ->
+ Db = <<"db">>,
+ Client1 = spawn_client(Db, <<"ddoc1">>),
+ Client2 = spawn_client(Db, <<"ddoc2">>),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ ?assertNotEqual(,,
+ ?assertEqual(ok, stop_client(Client1)),
+ ?assertEqual(ok, stop_client(Client2)),
+ ?assert(is_process_alive(,
+ ?assert(is_process_alive(,
+ Client1Again = spawn_client(Db, <<"ddoc1">>),
+ ?assertEqual(ok, ping_client(Client1Again)),
+ Proc1Again = get_client_proc(Client1Again, "1-again"),
+ ?assertEqual(,,
+ ?assertNotEqual(Proc1#proc.client, Proc1Again#proc.client),
+ ?assertEqual(ok, stop_client(Client1Again)).
-should_block_new_proc_on_full_pool() ->
- ?_test(begin
- Client1 = spawn_client(),
- Client2 = spawn_client(),
- Client3 = spawn_client(),
+should_process_waiting_queue_as_fifo(_) ->
+ Db = <<"db">>,
+ meck:reset(couch_proc_manager),
+ Client1 = spawn_client(Db, <<"ddoc1">>),
+ meck:wait(1, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+ Client2 = spawn_client(Db, <<"ddoc2">>),
+ meck:wait(2, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+ Client3 = spawn_client(Db, <<"ddoc3">>),
+ meck:wait(3, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+ Client4 = spawn_client(Db, <<"ddoc4">>),
+ meck:wait(4, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
+ Client5 = spawn_client(Db, <<"ddoc5">>),
+ meck:wait(5, couch_proc_manager, handle_call, [{get_proc, '_'}, '_', '_'], 1000),
- ?assertEqual(ok, ping_client(Client1)),
- ?assertEqual(ok, ping_client(Client2)),
- ?assertEqual(ok, ping_client(Client3)),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
+ ?assertEqual(timeout, ping_client(Client4)),
+ ?assertEqual(timeout, ping_client(Client5)),
- Proc1 = get_client_proc(Client1, "1"),
- Proc2 = get_client_proc(Client2, "2"),
- Proc3 = get_client_proc(Client3, "3"),
+ Proc1 = get_client_proc(Client1, "1"),
+ ?assertEqual(ok, stop_client(Client1)),
+ ?assertEqual(ok, ping_client(Client4)),
+ Proc4 = get_client_proc(Client4, "4"),
- ?assertNotEqual(Proc1, Proc2),
- ?assertNotEqual(Proc2, Proc3),
- ?assertNotEqual(Proc3, Proc1),
+ ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
+ ?assertEqual(,,
+ ?assertEqual(timeout, ping_client(Client5)),
- Client4 = spawn_client(),
- ?assertEqual(timeout, ping_client(Client4)),
+ ?assertEqual(ok, stop_client(Client2)),
+ ?assertEqual(ok, stop_client(Client3)),
+ ?assertEqual(ok, stop_client(Client4)),
+ ?assertEqual(ok, stop_client(Client5)).
- ?assertEqual(ok, stop_client(Client1)),
- ?assertEqual(ok, ping_client(Client4)),
+should_reduce_pool_on_idle_os_procs(_) ->
+ %% os_process_idle_limit is in sec
+ cfg_set("os_process_idle_limit", "1"),
- Proc4 = get_client_proc(Client4, "4"),
+ Db = undefined,
+ Client1 = spawn_client(Db, <<"ddoc1">>),
+ Client2 = spawn_client(Db, <<"ddoc2">>),
+ Client3 = spawn_client(Db, <<"ddoc3">>),
- ?assertEqual(,,
- ?assertNotEqual(Proc1#proc.client, Proc4#proc.client),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
- lists:map(fun(C) ->
- ?assertEqual(ok, stop_client(C))
- end, [Client2, Client3, Client4])
- end).
+ ?assertEqual(3, couch_proc_manager:get_proc_count()),
+ ?assertEqual(ok, stop_client(Client1)),
+ ?assertEqual(ok, stop_client(Client2)),
+ ?assertEqual(ok, stop_client(Client3)),
-should_free_slot_on_proc_unexpected_exit() ->
- ?_test(begin
- Client1 = spawn_client(),
- Client2 = spawn_client(),
- Client3 = spawn_client(),
+ % granularity of idle limit is in seconds
+ timer:sleep(1000),
+ wait_process_count(1).
- ?assertEqual(ok, ping_client(Client1)),
- ?assertEqual(ok, ping_client(Client2)),
- ?assertEqual(ok, ping_client(Client3)),
+should_reduce_pool_of_tagged_processes_on_idle(_) ->
+ %% os_process_idle_limit is in sec
+ cfg_set("os_process_idle_limit", "1"),
- Proc1 = get_client_proc(Client1, "1"),
- Proc2 = get_client_proc(Client2, "2"),
- Proc3 = get_client_proc(Client3, "3"),
+ Db = <<"reduce_pool_on_idle_db">>,
+ Client1 = spawn_client(Db, <<"ddoc1">>),
+ Client2 = spawn_client(Db, <<"ddoc2">>),
+ Client3 = spawn_client(Db, <<"ddoc3">>),
- ?assertNotEqual(,,
- ?assertNotEqual(Proc1#proc.client, Proc2#proc.client),
- ?assertNotEqual(,,
- ?assertNotEqual(Proc2#proc.client, Proc3#proc.client),
- ?assertNotEqual(,,
- ?assertNotEqual(Proc3#proc.client, Proc1#proc.client),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
- ?assertEqual(ok, kill_client(Client1)),
+ ?assertEqual(3, couch_proc_manager:get_proc_count()),
- Client4 = spawn_client(),
- ?assertEqual(ok, ping_client(Client4)),
+ stop_clients([Client1, Client2, Client3]),
- Proc4 = get_client_proc(Client4, "4"),
+ timer:sleep(1000),
+ wait_process_count(0).
- ?assertEqual(,,
- ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
- ?assertNotEqual(,,
- ?assertNotEqual(Proc2#proc.client, Proc4#proc.client),
- ?assertNotEqual(,,
- ?assertNotEqual(Proc3#proc.client, Proc4#proc.client),
+oldest_tagged_process_is_reaped(_) ->
+ Client1 = spawn_client(<<"db1">>, <<"ddoc1">>),
+ Client2 = spawn_client(<<"db2">>, <<"ddoc1">>),
+ Client3 = spawn_client(<<"db3">>, <<"ddoc1">>),
- lists:map(fun(C) ->
- ?assertEqual(ok, stop_client(C))
- end, [Client2, Client3, Client4])
- end).
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ Proc3 = get_client_proc(Client3, "3"),
-should_reuse_known_proc() ->
- ?_test(begin
- Client1 = spawn_client(<<"ddoc1">>),
- Client2 = spawn_client(<<"ddoc2">>),
+ ?assert(all_alive_all_different([Proc1, Proc2, Proc3])),
- ?assertEqual(ok, ping_client(Client1)),
- ?assertEqual(ok, ping_client(Client2)),
+ stop_clients([Client1, Client2, Client3]),
- Proc1 = get_client_proc(Client1, "1"),
- Proc2 = get_client_proc(Client2, "2"),
- ?assertNotEqual(,,
+ % All procs should be released back into the pool
+ wait_tagged_idle_count(3),
- ?assertEqual(ok, stop_client(Client1)),
- ?assertEqual(ok, stop_client(Client2)),
- ?assert(is_process_alive(,
- ?assert(is_process_alive(,
+ % Processes should be alive
+ ?assert(all_alive([Proc1, Proc2, Proc3])),
- Client1Again = spawn_client(<<"ddoc1">>),
- ?assertEqual(ok, ping_client(Client1Again)),
- Proc1Again = get_client_proc(Client1Again, "1-again"),
- ?assertEqual(,,
- ?assertNotEqual(Proc1#proc.client, Proc1Again#proc.client),
- ?assertEqual(ok, stop_client(Client1Again))
- end).
-%should_process_waiting_queue_as_fifo() ->
-% ?_test(begin
-% Client1 = spawn_client(<<"ddoc1">>),
-% Client2 = spawn_client(<<"ddoc2">>),
-% Client3 = spawn_client(<<"ddoc3">>),
-% Client4 = spawn_client(<<"ddoc4">>),
-% timer:sleep(100),
-% Client5 = spawn_client(<<"ddoc5">>),
-% ?assertEqual(ok, ping_client(Client1)),
-% ?assertEqual(ok, ping_client(Client2)),
-% ?assertEqual(ok, ping_client(Client3)),
-% ?assertEqual(timeout, ping_client(Client4)),
-% ?assertEqual(timeout, ping_client(Client5)),
-% Proc1 = get_client_proc(Client1, "1"),
-% ?assertEqual(ok, stop_client(Client1)),
-% ?assertEqual(ok, ping_client(Client4)),
-% Proc4 = get_client_proc(Client4, "4"),
-% ?assertNotEqual(Proc4#proc.client, Proc1#proc.client),
-% ?assertEqual(,,
-% ?assertEqual(timeout, ping_client(Client5)),
-% ?assertEqual(ok, stop_client(Client2)),
-% ?assertEqual(ok, stop_client(Client3)),
-% ?assertEqual(ok, stop_client(Client4)),
-% ?assertEqual(ok, stop_client(Client5))
-% end).
+ % Spawning a new tagged proc with a different tag should kill
+ % the oldest unused proc and spawn a new one
+ Client4 = spawn_client(<<"db4">>, <<"ddoc1">>),
+ ?assertEqual(ok, ping_client(Client4)),
+ Proc4 = get_client_proc(Client4, "4"),
+ ?assert(all_alive_all_different([Proc2, Proc3, Proc4])),
+ ?assertNot(is_process_alive(,
+ ?assertEqual(ok, stop_client(Client4)).
+untagged_process_is_replenished(_) ->
+ Client1 = spawn_client(),
+ Client2 = spawn_client(),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ ?assert(all_alive_all_different([Proc1, Proc2])),
+ stop_clients([Client1, Client2]),
+ % All procs should be released back into the pool
+ % and they are now all untagged idle
+ wait_idle_count(2),
+ ?assertEqual(0, tagged_idle_count()),
+ % Processes should still be alive
+ ?assert(all_alive([Proc1, Proc2])),
+ % Spawning a new tagged proc should tag one of the procs
+ % and also asynchronously replenish the untagged pool
+ Client3 = spawn_client(<<"db">>, <<"ddoc1">>),
+ ?assertEqual(ok, ping_client(Client3)),
+ % The process is one of the previously untagged ones
+ Proc3 = get_client_proc(Client3, "3"),
+ Pid3 =,
+ ?assert(lists:member(Pid3, proc_pids([Proc1, Proc2]))),
+ % wait for replinishment
+ wait_idle_count(2),
+ ?assertEqual(ok, stop_client(Client3)).
+exact_ddoc_tagged_process_is_picked_first(_) ->
+ Client1 = spawn_client(<<"db">>, <<"ddoc1">>),
+ Client2 = spawn_client(<<"db">>, <<"ddoc2">>),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ ?assert(all_alive_all_different([Proc1, Proc2])),
+ stop_clients([Client1, Client2]),
+ % All procs should be released back into the pool
+ % and they now tagged and idle
+ wait_tagged_idle_count(2),
+ wait_idle_count(2),
+ % Processes should still be alive
+ ?assert(all_alive([Proc1, Proc2])),
+ % Spawning a new tagged proc should pick the one with
+ % matching ddoc
+ Client3 = spawn_client(<<"db">>, <<"ddoc1">>),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc3 = get_client_proc(Client3, "3"),
+ ?assertEqual(,,
+ ?assertEqual(ok, stop_client(Client3)).
+db_tagged_process_is_second_choice(_) ->
+ Client1 = spawn_client(<<"db1">>, <<"ddoc1">>),
+ Client2 = spawn_client(<<"db2">>, <<"ddoc2">>),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ ?assert(all_alive_all_different([Proc1, Proc2])),
-should_reduce_pool_on_idle_os_procs() ->
- ?_test(begin
- %% os_process_idle_limit is in sec
- config:set("query_server_config",
- "os_process_idle_limit", "1", false),
- ok = confirm_config("os_process_idle_limit", "1"),
+ stop_clients([Client1, Client2]),
- Client1 = spawn_client(<<"ddoc1">>),
- Client2 = spawn_client(<<"ddoc2">>),
- Client3 = spawn_client(<<"ddoc3">>),
+ % All procs should be released back into the pool
+ % and they now tagged and idle
+ wait_tagged_idle_count(2),
+ wait_idle_count(2),
- ?assertEqual(ok, ping_client(Client1)),
- ?assertEqual(ok, ping_client(Client2)),
- ?assertEqual(ok, ping_client(Client3)),
+ % Processes should still be alive
+ ?assert(all_alive([Proc1, Proc2])),
- ?assertEqual(3, couch_proc_manager:get_proc_count()),
+ % Spawning a new tagged proc should pick the one with
+ % the matching ddoc
+ Client3 = spawn_client(<<"db1">>, <<"ddoc3">>),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc3 = get_client_proc(Client3, "3"),
+ ?assertEqual(,,
- ?assertEqual(ok, stop_client(Client1)),
- ?assertEqual(ok, stop_client(Client2)),
- ?assertEqual(ok, stop_client(Client3)),
+ ?assertEqual(ok, stop_client(Client3)).
- timer:sleep(1200),
- ?assertEqual(1, couch_proc_manager:get_proc_count())
- end).
+if_no_tagged_process_found_new_must_be_spawned(_) ->
+ Client1 = spawn_client(<<"db1">>, <<"ddoc">>),
+ Client2 = spawn_client(<<"db2">>, <<"ddoc">>),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
-should_not_return_broken_process_to_the_pool() ->
- ?_test(begin
- config:set("query_server_config",
- "os_process_soft_limit", "1", false),
- ok = confirm_config("os_process_soft_limit", "1"),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
- config:set("query_server_config",
- "os_process_limit", "1", false),
- ok = confirm_config("os_process_limit", "1"),
+ ?assert(all_alive_all_different([Proc1, Proc2])),
- DDoc1 = ddoc(<<"_design/ddoc1">>),
+ stop_clients([Client1, Client2]),
- meck:reset(couch_os_process),
+ % All procs should be released back into the pool
+ % and they now tagged and idle
+ wait_tagged_idle_count(2),
+ wait_idle_count(2),
- ?assertEqual(0, couch_proc_manager:get_proc_count()),
- ok = couch_query_servers:with_ddoc_proc(DDoc1, fun(_) -> ok end),
- ?assertEqual(0, meck:num_calls(couch_os_process, stop, 1)),
- ?assertEqual(1, couch_proc_manager:get_proc_count()),
+ % Processes should still be alive
+ ?assert(all_alive([Proc1, Proc2])),
- ?assertError(bad, couch_query_servers:with_ddoc_proc(DDoc1, fun(_) ->
+ % If new tagged process with new db should spawn
+ % new process never pick up an existing one
+ Client3 = spawn_client(<<"db3">>, <<"ddoc">>),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc3 = get_client_proc(Client3, "3"),
+ ?assertNotEqual(,,
+ ?assertNotEqual(,,
+ % db1 and db2 procs should still be sitting idle
+ ?assertEqual(2, tagged_idle_count()),
+ % After 3rd proc returns to the pool there should
+ % be 3 tagged idle processes
+ ?assertEqual(ok, stop_client(Client3)),
+ wait_tagged_idle_count(3),
+ ?assertEqual(3, idle_count()).
+db_tag_none_works(_) ->
+ cfg_set("db_tag", "none"),
+ Client1 = spawn_client(undefined, <<"ddoc1">>),
+ Client2 = spawn_client(undefined, <<"ddoc2">>),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ ?assert(all_alive_all_different([Proc1, Proc2])),
+ stop_clients([Client1, Client2]),
+ % All procs should be released back into the pool
+ % they should be untagged effectively
+ wait_idle_count(2),
+ ?assertEqual(0, tagged_idle_count()),
+ % Processes should still be alive
+ ?assert(all_alive([Proc1, Proc2])),
+ % If new tagged process with new db should spawn
+ % new process and pick based on ddoc id matching
+ Client3 = spawn_client(undefined, <<"ddoc1">>),
+ ?assertEqual(ok, ping_client(Client3)),
+ Proc3 = get_client_proc(Client3, "3"),
+ ?assertEqual(,,
+ ?assertNotEqual(,,
+ wait_idle_count(1),
+ % After 3rd client stop there should be 2 idle
+ % untagged procs
+ ?assertEqual(ok, stop_client(Client3)),
+ wait_idle_count(2),
+ ?assertEqual(0, tagged_idle_count()).
+stale_procs_are_cleaned(_) ->
+ Client1 = spawn_client(),
+ Client2 = spawn_client(),
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ Proc1 = get_client_proc(Client1, "1"),
+ Proc2 = get_client_proc(Client2, "2"),
+ ?assert(all_alive_all_different([Proc1, Proc2])),
+ ?assertEqual(0, couch_proc_manager:get_stale_proc_count()),
+ ?assertEqual(ok, couch_proc_manager:reload()),
+ ?assertEqual(2, couch_proc_manager:get_stale_proc_count()),
+ stop_clients([Client1, Client2]),
+ ?assertEqual(ok, couch_proc_manager:terminate_stale_procs()),
+ wait_idle_count(0),
+ ?assertEqual(0, couch_proc_manager:get_proc_count()).
+bad_query_language(_) ->
+ Expect = {unknown_query_language, <<"bad">>},
+ ?assertThrow(Expect, couch_query_servers:get_os_process(<<"bad">>)).
+should_not_return_broken_process_to_the_pool(_) ->
+ cfg_set("os_process_soft_limit", "1"),
+ cfg_set("os_process_limit", "1"),
+ Db = <<"thedb">>,
+ DDoc1 = ddoc(<<"_design/ddoc1">>),
+ meck:reset(couch_os_process),
+ ?assertEqual(0, couch_proc_manager:get_proc_count()),
+ ok = couch_query_servers:with_ddoc_proc(Db, DDoc1, fun(_) -> ok end),
+ ?assertEqual(0, meck:num_calls(couch_os_process, stop, 1)),
+ ?assertEqual(1, couch_proc_manager:get_proc_count()),
+ ?assertError(
+ bad,
+ couch_query_servers:with_ddoc_proc(Db, DDoc1, fun(_) ->
- end)),
- ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
- WaitFun = fun() ->
- case couch_proc_manager:get_proc_count() of
- 0 -> ok;
- N when is_integer(N), N > 0 -> wait
- end
- end,
- case test_util:wait(WaitFun, 5000) of
- timeout -> error(timeout);
- _ -> ok
- end,
- ?assertEqual(0, couch_proc_manager:get_proc_count()),
- DDoc2 = ddoc(<<"_design/ddoc2">>),
- ok = couch_query_servers:with_ddoc_proc(DDoc2, fun(_) -> ok end),
- ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
- ?assertEqual(1, couch_proc_manager:get_proc_count())
- end).
+ end)
+ ),
+ ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
+ WaitFun = fun() ->
+ case couch_proc_manager:get_proc_count() of
+ 0 -> ok;
+ N when is_integer(N), N > 0 -> wait
+ end
+ end,
+ case test_util:wait(WaitFun, 5000) of
+ timeout -> error(timeout);
+ _ -> ok
+ end,
+ ?assertEqual(0, couch_proc_manager:get_proc_count()),
+ DDoc2 = ddoc(<<"_design/ddoc2">>),
+ ok = couch_query_servers:with_ddoc_proc(Db, DDoc2, fun(_) -> ok end),
+ ?assertEqual(1, meck:num_calls(couch_os_process, stop, 1)),
+ ?assertEqual(1, couch_proc_manager:get_proc_count()).
ddoc(DDocId) ->
@@ -269,26 +521,14 @@ setup_config() ->
config:set("native_query_servers", "enable_erlang_query_server", "true", false),
config:set("query_server_config", "os_process_limit", "3", false),
config:set("query_server_config", "os_process_soft_limit", "2", false),
- ok = confirm_config("os_process_soft_limit", "2").
-confirm_config(Key, Value) ->
- confirm_config(Key, Value, 0).
-confirm_config(Key, Value, Count) ->
- case config:get("query_server_config", Key) of
- Value ->
- ok;
- _ when Count > 10 ->
- erlang:error({config_setup, [
- {module, ?MODULE},
- {line, ?LINE},
- {value, timeout}
- ]});
- _ ->
- %% we need to wait to let gen_server:cast finish
- timer:sleep(10),
- confirm_config(Key, Value, Count + 1)
- end.
+ ok.
+teardown_config() ->
+ config:delete("native_query_servers", "enable_erlang_query_server", false),
+ config:delete("query_server_config", "os_process_limit", false),
+ config:delete("query_server_config", "os_process_soft_limit", false),
+ config:delete("query_server_config", "db_tag", false),
+ ok.
spawn_client() ->
Parent = self(),
@@ -299,13 +539,13 @@ spawn_client() ->
{Pid, Ref}.
-spawn_client(DDocId) ->
+spawn_client(Db, DDocId) ->
Parent = self(),
Ref = make_ref(),
Pid = spawn(fun() ->
DDocKey = {DDocId, <<"1-abcdefgh">>},
DDoc = #doc{body={[{<<"language">>, <<"erlang">>}]}},
- Proc = couch_query_servers:get_ddoc_process(DDoc, DDocKey),
+ Proc = couch_query_servers:get_ddoc_process(DDoc, Db, DDocKey),
loop(Parent, Ref, Proc)
{Pid, Ref}.
@@ -332,20 +572,30 @@ get_client_proc({Pid, Ref}, ClientName) ->
stop_client({Pid, Ref}) ->
+ MRef = erlang:monitor(process, Pid),
Pid ! stop,
{stop, Ref} ->
+ receive
+ {'DOWN', MRef, process, Pid, _} -> ok
+ end,
after ?TIMEOUT ->
+ erlang:demonitor(MRef, [flush]),
kill_client({Pid, Ref}) ->
+ MRef = erlang:monitor(process, Pid),
Pid ! die,
{die, Ref} ->
+ receive
+ {'DOWN', MRef, process, Pid, _} -> ok
+ end,
after ?TIMEOUT ->
+ erlang:demonitor(MRef, [flush]),
@@ -364,3 +614,68 @@ loop(Parent, Ref, Proc) ->
Parent ! {die, Ref},
+proc_pids(Procs) ->
+ [ || P <- Procs].
+all_alive(Procs) ->
+ lists:all(fun is_process_alive/1, proc_pids(Procs)).
+all_different(Procs) ->
+ lists:usort(proc_pids(Procs)) =:= lists:sort(proc_pids(Procs)).
+all_alive_all_different(Procs) ->
+ all_alive(Procs) andalso all_different(Procs).
+idle_count() ->
+ ets:info(couch_proc_manager_idle_by_db, size).
+tagged_idle_count() ->
+ ets:info(couch_proc_manager_idle_access, size).
+stop_clients(Clients) ->
+ Fun = fun(C) -> ?assertEqual(ok, stop_client(C)) end,
+ lists:map(Fun, Clients).
+wait_tagged_idle_count(N) ->
+ WaitFun = fun() ->
+ case tagged_idle_count() == N of
+ true -> ok;
+ false -> wait
+ end
+ end,
+ case test_util:wait(WaitFun, 5000) of
+ timeout -> error(timeout);
+ _ -> ok
+ end,
+ ?assertEqual(N, tagged_idle_count()).
+wait_idle_count(N) ->
+ WaitFun = fun() ->
+ case idle_count() == N of
+ true -> ok;
+ false -> wait
+ end
+ end,
+ case test_util:wait(WaitFun, 5000) of
+ timeout -> error(timeout);
+ _ -> ok
+ end,
+ ?assertEqual(N, idle_count()).
+wait_process_count(N) ->
+ WaitFun = fun() ->
+ case couch_proc_manager:get_proc_count() == N of
+ true -> ok;
+ false -> wait
+ end
+ end,
+ case test_util:wait(WaitFun, 5000) of
+ timeout -> error(timeout);
+ _ -> ok
+ end,
+ ?assertEqual(N, couch_proc_manager:get_proc_count()).
+cfg_set(K, V) ->
+ config:set("query_server_config", K, V, false),
+ ok.
diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl
index 0268b706e..16a75975a 100644
--- a/src/couch_mrview/src/couch_mrview_show.erl
+++ b/src/couch_mrview/src/couch_mrview_show.erl
@@ -77,7 +77,7 @@ handle_doc_show(Req, Db, DDoc, ShowName, Doc, DocId) ->
JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
JsonDoc = couch_query_servers:json_doc(Doc),
[<<"resp">>, ExternalResp] =
- couch_query_servers:ddoc_prompt(DDoc, [<<"shows">>, ShowName],
+ couch_query_servers:ddoc_prompt(Db, DDoc, [<<"shows">>, ShowName],
[JsonDoc, JsonReq]),
JsonResp = apply_etag(ExternalResp, CurrentEtag),
chttpd_external:send_external_response(Req, JsonResp)
@@ -122,7 +122,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) ->
JsonReq = chttpd_external:json_req_obj(Req, Db, DocId),
JsonDoc = couch_query_servers:json_doc(Doc),
Cmd = [<<"updates">>, UpdateName],
- UpdateResp = couch_query_servers:ddoc_prompt(DDoc, Cmd, [JsonDoc, JsonReq]),
+ UpdateResp = couch_query_servers:ddoc_prompt(Db, DDoc, Cmd, [JsonDoc, JsonReq]),
JsonResp = case UpdateResp of
[<<"up">>, {NewJsonDoc}, {JsonResp0}] ->
case chttpd:header_value(
@@ -196,7 +196,7 @@ handle_view_list(Req, Db, DDoc, LName, VDDoc, VName, Keys) ->
Args = Args0#mrargs{preflight_fun=ETagFun},
couch_httpd:etag_maybe(Req, fun() ->
- couch_query_servers:with_ddoc_proc(DDoc, fun(QServer) ->
+ couch_query_servers:with_ddoc_proc(Db, DDoc, fun(QServer) ->
Acc = #lacc{db=Db, req=Req, qserver=QServer, lname=LName},
case VName of
<<"_all_docs">> ->
diff --git a/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl b/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl
index 2182dead6..4cbbd382b 100644
--- a/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl
+++ b/src/ddoc_cache/src/ddoc_cache_entry_validation_funs.erl
@@ -32,7 +32,7 @@ ddocid(_) ->
recover(DbName) ->
{ok, DDocs} = fabric:design_docs(mem3:dbname(DbName)),
Funs = lists:flatmap(fun(DDoc) ->
- case couch_doc:get_validate_doc_fun(DDoc) of
+ case couch_doc:get_validate_doc_fun(DbName, DDoc) of
nil -> [];
Fun -> [Fun]