diff options
author | Filipe David Borba Manana <fdmanana@apache.org> | 2011-11-06 14:25:04 +0000 |
---|---|---|
committer | Filipe David Borba Manana <fdmanana@apache.org> | 2011-11-16 11:55:28 +0000 |
commit | a851c6e5150d14221ca018587d76214856c1555a (patch) | |
tree | d07b19eafe95db03f93da8cf13ba04edc14ea9bc | |
parent | 72992642d2a5f15b417736e019ef2b6437300fc9 (diff) | |
download | couchdb-a851c6e5150d14221ca018587d76214856c1555a.tar.gz |
More efficient communication with the view server
This change makes the communication between the Erlang VM and
an external view server (couchjs for e.g.) more efficient by
writing a series of commands into the port and reading all the
responses from the external view server after doing all those
writes. This minimizes the amount of time each endpoint spends
blocked reading from the port.
COUCHDB-1334
-rw-r--r-- | src/couch_mrview/src/couch_mrview_updater.erl | 46 | ||||
-rw-r--r-- | src/couchdb/couch_native_process.erl | 11 | ||||
-rw-r--r-- | src/couchdb/couch_os_process.erl | 38 | ||||
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 17 |
4 files changed, 87 insertions, 25 deletions
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 9604ea954..3014664ea 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -130,32 +130,42 @@ map_docs(Parent, State0) -> couch_query_servers:stop_doc_map(State0#mrst.qserver), couch_work_queue:close(State0#mrst.write_queue); {ok, Dequeued} -> - % Run all the non deleted docs through the view engine and - % then pass the results on to the writer process. State1 = case State0#mrst.qserver of nil -> start_query_server(State0); _ -> State0 end, - QServer = State1#mrst.qserver, - DocFun = fun - ({nil, Seq, _}, {SeqAcc, Results}) -> - {erlang:max(Seq, SeqAcc), Results}; - ({Id, Seq, deleted}, {SeqAcc, Results}) -> - {erlang:max(Seq, SeqAcc), [{Id, []} | Results]}; - ({Id, Seq, Doc}, {SeqAcc, Results}) -> - {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), - {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} - end, - FoldFun = fun(Docs, Acc) -> - update_task(length(Docs)), - lists:foldl(DocFun, Acc, Docs) - end, - Results = lists:foldl(FoldFun, {0, []}, Dequeued), - couch_work_queue:queue(State1#mrst.write_queue, Results), + {ok, MapResults} = compute_map_results(State1, Dequeued), + couch_work_queue:queue(State1#mrst.write_queue, MapResults), map_docs(Parent, State1) end. +compute_map_results(#mrst{qserver = Qs}, Dequeued) -> + % Run all the non deleted docs through the view engine and + % then pass the results on to the writer process. + DocFun = fun + ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) -> + {erlang:max(Seq, SeqAcc), AccDel, AccNotDel}; + ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) -> + {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel}; + ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) -> + {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]} + end, + FoldFun = fun(Docs, Acc) -> + lists:foldl(DocFun, Acc, Docs) + end, + {MaxSeq, DeletedResults, Docs} = + lists:foldl(FoldFun, {0, [], []}, Dequeued), + {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs), + NotDeletedResults = lists:zipwith( + fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end, + Docs, + MapResultList), + AllMapResults = DeletedResults ++ NotDeletedResults, + update_task(length(AllMapResults)), + {ok, {MaxSeq, AllMapResults}}. + + write_results(Parent, State) -> case couch_work_queue:dequeue(State#mrst.write_queue) of closed -> diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl index 5a32e75e7..b1d51ed39 100644 --- a/src/couchdb/couch_native_process.erl +++ b/src/couchdb/couch_native_process.erl @@ -42,7 +42,7 @@ -export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, handle_info/2]). --export([set_timeout/2, prompt/2]). +-export([set_timeout/2, prompt/2, prompt_many/2]). -define(STATE, native_proc_state). -record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}). @@ -62,6 +62,15 @@ set_timeout(Pid, TimeOut) -> prompt(Pid, Data) when is_list(Data) -> gen_server:call(Pid, {prompt, Data}). +prompt_many(Pid, DataList) -> + prompt_many(Pid, DataList, []). + +prompt_many(_Pid, [], Acc) -> + {ok, lists:reverse(Acc)}; +prompt_many(Pid, [Data | Rest], Acc) -> + Result = prompt(Pid, Data), + prompt_many(Pid, Rest, [Result | Acc]). + % gen_server callbacks init([]) -> {ok, #evstate{ddocs=dict:new()}}. diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl index db62d499e..3a267be21 100644 --- a/src/couchdb/couch_os_process.erl +++ b/src/couchdb/couch_os_process.erl @@ -14,7 +14,7 @@ -behaviour(gen_server). -export([start_link/1, start_link/2, start_link/3, stop/1]). --export([set_timeout/2, prompt/2]). +-export([set_timeout/2, prompt/2, prompt_many/2]). -export([send/2, writeline/2, readline/1, writejson/2, readjson/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). @@ -57,6 +57,40 @@ prompt(Pid, Data) -> throw(Error) end. +prompt_many(Pid, DataList) -> + OsProc = gen_server:call(Pid, get_os_proc, infinity), + true = port_connect(OsProc#os_proc.port, self()), + try + send_many(OsProc, DataList), + receive_many(length(DataList), OsProc, []) + after + % Can throw badarg error, when OsProc Pid is dead or port was closed + % by the readline function on error/timeout. + (catch port_connect(OsProc#os_proc.port, Pid)), + unlink(OsProc#os_proc.port), + drop_port_messages(OsProc#os_proc.port) + end. + +send_many(_OsProc, []) -> + ok; +send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) -> + Writer(OsProc, Data), + send_many(OsProc, Rest). + +receive_many(0, _OsProc, Acc) -> + {ok, lists:reverse(Acc)}; +receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) -> + Line = Reader(OsProc), + receive_many(N - 1, OsProc, [Line | Acc]). + +drop_port_messages(Port) -> + receive + {Port, _} -> + drop_port_messages(Port) + after 0 -> + ok + end. + % Utility functions for reading and writing % in custom functions writeline(OsProc, Data) when is_record(OsProc, os_proc) -> @@ -175,6 +209,8 @@ terminate(_Reason, #os_proc{port=Port}) -> catch port_close(Port), ok. +handle_call(get_os_proc, _From, OsProc) -> + {reply, OsProc, OsProc}; handle_call({set_timeout, TimeOut}, _From, OsProc) -> {reply, ok, OsProc#os_proc{timeout=TimeOut}}; handle_call({prompt, Data}, _From, OsProc) -> diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 03f201254..c9c2bc676 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -16,7 +16,7 @@ -export([start_link/0, config_change/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). --export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]). +-export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). -export([filter_view/3]). @@ -33,6 +33,7 @@ lang, ddoc_keys = [], prompt_fun, + prompt_many_fun, set_timeout_fun, stop_fun }). @@ -83,10 +84,15 @@ map_docs(Proc, Docs) -> Docs), {ok, Results}. -map_doc_raw(Proc, Doc) -> - Json = couch_doc:to_json_obj(Doc, []), - {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}. - +map_docs_raw(Proc, DocList) -> + {Mod, Fun} = Proc#proc.prompt_many_fun, + CommandList = lists:map( + fun(Doc) -> + EJson = couch_doc:to_json_obj(Doc, []), + [<<"map_doc">>, EJson] + end, + DocList), + Mod:Fun(Proc#proc.pid, CommandList). stop_doc_map(nil) -> ok; @@ -479,6 +485,7 @@ new_process(Langs, LangLimits, Lang) -> pid=Pid, % Called via proc_prompt, proc_set_timeout, and proc_stop prompt_fun={Mod, prompt}, + prompt_many_fun={Mod, prompt_many}, set_timeout_fun={Mod, set_timeout}, stop_fun={Mod, stop}}}; _ -> |