summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2011-11-06 14:25:04 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2011-11-16 11:55:28 +0000
commita851c6e5150d14221ca018587d76214856c1555a (patch)
treed07b19eafe95db03f93da8cf13ba04edc14ea9bc
parent72992642d2a5f15b417736e019ef2b6437300fc9 (diff)
downloadcouchdb-a851c6e5150d14221ca018587d76214856c1555a.tar.gz
More efficient communication with the view server
This change makes the communication between the Erlang VM and an external view server (couchjs for e.g.) more efficient by writing a series of commands into the port and reading all the responses from the external view server after doing all those writes. This minimizes the amount of time each endpoint spends blocked reading from the port. COUCHDB-1334
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl46
-rw-r--r--src/couchdb/couch_native_process.erl11
-rw-r--r--src/couchdb/couch_os_process.erl38
-rw-r--r--src/couchdb/couch_query_servers.erl17
4 files changed, 87 insertions, 25 deletions
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9604ea954..3014664ea 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -130,32 +130,42 @@ map_docs(Parent, State0) ->
couch_query_servers:stop_doc_map(State0#mrst.qserver),
couch_work_queue:close(State0#mrst.write_queue);
{ok, Dequeued} ->
- % Run all the non deleted docs through the view engine and
- % then pass the results on to the writer process.
State1 = case State0#mrst.qserver of
nil -> start_query_server(State0);
_ -> State0
end,
- QServer = State1#mrst.qserver,
- DocFun = fun
- ({nil, Seq, _}, {SeqAcc, Results}) ->
- {erlang:max(Seq, SeqAcc), Results};
- ({Id, Seq, deleted}, {SeqAcc, Results}) ->
- {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
- ({Id, Seq, Doc}, {SeqAcc, Results}) ->
- {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
- {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
- end,
- FoldFun = fun(Docs, Acc) ->
- update_task(length(Docs)),
- lists:foldl(DocFun, Acc, Docs)
- end,
- Results = lists:foldl(FoldFun, {0, []}, Dequeued),
- couch_work_queue:queue(State1#mrst.write_queue, Results),
+ {ok, MapResults} = compute_map_results(State1, Dequeued),
+ couch_work_queue:queue(State1#mrst.write_queue, MapResults),
map_docs(Parent, State1)
end.
+compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
+ % Run all the non deleted docs through the view engine and
+ % then pass the results on to the writer process.
+ DocFun = fun
+ ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
+ {erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
+ ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
+ {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
+ ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
+ {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
+ end,
+ FoldFun = fun(Docs, Acc) ->
+ lists:foldl(DocFun, Acc, Docs)
+ end,
+ {MaxSeq, DeletedResults, Docs} =
+ lists:foldl(FoldFun, {0, [], []}, Dequeued),
+ {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
+ NotDeletedResults = lists:zipwith(
+ fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
+ Docs,
+ MapResultList),
+ AllMapResults = DeletedResults ++ NotDeletedResults,
+ update_task(length(AllMapResults)),
+ {ok, {MaxSeq, AllMapResults}}.
+
+
write_results(Parent, State) ->
case couch_work_queue:dequeue(State#mrst.write_queue) of
closed ->
diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl
index 5a32e75e7..b1d51ed39 100644
--- a/src/couchdb/couch_native_process.erl
+++ b/src/couchdb/couch_native_process.erl
@@ -42,7 +42,7 @@
-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
handle_info/2]).
--export([set_timeout/2, prompt/2]).
+-export([set_timeout/2, prompt/2, prompt_many/2]).
-define(STATE, native_proc_state).
-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
@@ -62,6 +62,15 @@ set_timeout(Pid, TimeOut) ->
prompt(Pid, Data) when is_list(Data) ->
gen_server:call(Pid, {prompt, Data}).
+prompt_many(Pid, DataList) ->
+ prompt_many(Pid, DataList, []).
+
+prompt_many(_Pid, [], Acc) ->
+ {ok, lists:reverse(Acc)};
+prompt_many(Pid, [Data | Rest], Acc) ->
+ Result = prompt(Pid, Data),
+ prompt_many(Pid, Rest, [Result | Acc]).
+
% gen_server callbacks
init([]) ->
{ok, #evstate{ddocs=dict:new()}}.
diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl
index db62d499e..3a267be21 100644
--- a/src/couchdb/couch_os_process.erl
+++ b/src/couchdb/couch_os_process.erl
@@ -14,7 +14,7 @@
-behaviour(gen_server).
-export([start_link/1, start_link/2, start_link/3, stop/1]).
--export([set_timeout/2, prompt/2]).
+-export([set_timeout/2, prompt/2, prompt_many/2]).
-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
@@ -57,6 +57,40 @@ prompt(Pid, Data) ->
throw(Error)
end.
+prompt_many(Pid, DataList) ->
+ OsProc = gen_server:call(Pid, get_os_proc, infinity),
+ true = port_connect(OsProc#os_proc.port, self()),
+ try
+ send_many(OsProc, DataList),
+ receive_many(length(DataList), OsProc, [])
+ after
+ % Can throw badarg error, when OsProc Pid is dead or port was closed
+ % by the readline function on error/timeout.
+ (catch port_connect(OsProc#os_proc.port, Pid)),
+ unlink(OsProc#os_proc.port),
+ drop_port_messages(OsProc#os_proc.port)
+ end.
+
+send_many(_OsProc, []) ->
+ ok;
+send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
+ Writer(OsProc, Data),
+ send_many(OsProc, Rest).
+
+receive_many(0, _OsProc, Acc) ->
+ {ok, lists:reverse(Acc)};
+receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
+ Line = Reader(OsProc),
+ receive_many(N - 1, OsProc, [Line | Acc]).
+
+drop_port_messages(Port) ->
+ receive
+ {Port, _} ->
+ drop_port_messages(Port)
+ after 0 ->
+ ok
+ end.
+
% Utility functions for reading and writing
% in custom functions
writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
@@ -175,6 +209,8 @@ terminate(_Reason, #os_proc{port=Port}) ->
catch port_close(Port),
ok.
+handle_call(get_os_proc, _From, OsProc) ->
+ {reply, OsProc, OsProc};
handle_call({set_timeout, TimeOut}, _From, OsProc) ->
{reply, ok, OsProc#os_proc{timeout=TimeOut}};
handle_call({prompt, Data}, _From, OsProc) ->
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 03f201254..c9c2bc676 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -16,7 +16,7 @@
-export([start_link/0, config_change/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
--export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).
-export([filter_view/3]).
@@ -33,6 +33,7 @@
lang,
ddoc_keys = [],
prompt_fun,
+ prompt_many_fun,
set_timeout_fun,
stop_fun
}).
@@ -83,10 +84,15 @@ map_docs(Proc, Docs) ->
Docs),
{ok, Results}.
-map_doc_raw(Proc, Doc) ->
- Json = couch_doc:to_json_obj(Doc, []),
- {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
-
+map_docs_raw(Proc, DocList) ->
+ {Mod, Fun} = Proc#proc.prompt_many_fun,
+ CommandList = lists:map(
+ fun(Doc) ->
+ EJson = couch_doc:to_json_obj(Doc, []),
+ [<<"map_doc">>, EJson]
+ end,
+ DocList),
+ Mod:Fun(Proc#proc.pid, CommandList).
stop_doc_map(nil) ->
ok;
@@ -479,6 +485,7 @@ new_process(Langs, LangLimits, Lang) ->
pid=Pid,
% Called via proc_prompt, proc_set_timeout, and proc_stop
prompt_fun={Mod, prompt},
+ prompt_many_fun={Mod, prompt_many},
set_timeout_fun={Mod, set_timeout},
stop_fun={Mod, stop}}};
_ ->