authorDave Cottlehuber <>2012-12-12 21:37:18 +0100
committerDave Cottlehuber <>2013-07-23 13:14:33 +0200
commit6ffc52796182a8d3673f3648c59c7e9abddf4fa2 (patch)
parent635022b27cf72efe82bc30f56393070f2b842615 (diff)
COUCHDB-1334 - revert "More efficient communication with the view server"1334-revert-feature-view-server-pipelining
This reverts commit a851c6e - COUCHDB-1334 breaks with Windows + couchjs in unexplained ways - reducing to 1 concurrent query server is not sufficient - Testing with open_port options overlapped_io was not in itself sufficient - find overlapped_io - Refer history in COUCHDB-1346
4 files changed, 25 insertions, 87 deletions
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 3014664ea..9604ea954 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -130,42 +130,32 @@ map_docs(Parent, State0) ->
{ok, Dequeued} ->
+ % Run all the non deleted docs through the view engine and
+ % then pass the results on to the writer process.
State1 = case State0#mrst.qserver of
nil -> start_query_server(State0);
_ -> State0
- {ok, MapResults} = compute_map_results(State1, Dequeued),
- couch_work_queue:queue(State1#mrst.write_queue, MapResults),
+ QServer = State1#mrst.qserver,
+ DocFun = fun
+ ({nil, Seq, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
+ ({Id, Seq, deleted}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
+ ({Id, Seq, Doc}, {SeqAcc, Results}) ->
+ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+ {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
+ end,
+ FoldFun = fun(Docs, Acc) ->
+ update_task(length(Docs)),
+ lists:foldl(DocFun, Acc, Docs)
+ end,
+ Results = lists:foldl(FoldFun, {0, []}, Dequeued),
+ couch_work_queue:queue(State1#mrst.write_queue, Results),
map_docs(Parent, State1)
-compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
- % Run all the non deleted docs through the view engine and
- % then pass the results on to the writer process.
- DocFun = fun
- ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
- {erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
- ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
- {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
- ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
- {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
- end,
- FoldFun = fun(Docs, Acc) ->
- lists:foldl(DocFun, Acc, Docs)
- end,
- {MaxSeq, DeletedResults, Docs} =
- lists:foldl(FoldFun, {0, [], []}, Dequeued),
- {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
- NotDeletedResults = lists:zipwith(
- fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
- Docs,
- MapResultList),
- AllMapResults = DeletedResults ++ NotDeletedResults,
- update_task(length(AllMapResults)),
- {ok, {MaxSeq, AllMapResults}}.
write_results(Parent, State) ->
case couch_work_queue:dequeue(State#mrst.write_queue) of
closed ->
diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl
index b1d51ed39..5a32e75e7 100644
--- a/src/couchdb/couch_native_process.erl
+++ b/src/couchdb/couch_native_process.erl
@@ -42,7 +42,7 @@
--export([set_timeout/2, prompt/2, prompt_many/2]).
+-export([set_timeout/2, prompt/2]).
-define(STATE, native_proc_state).
-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
@@ -62,15 +62,6 @@ set_timeout(Pid, TimeOut) ->
prompt(Pid, Data) when is_list(Data) ->
gen_server:call(Pid, {prompt, Data}).
-prompt_many(Pid, DataList) ->
- prompt_many(Pid, DataList, []).
-prompt_many(_Pid, [], Acc) ->
- {ok, lists:reverse(Acc)};
-prompt_many(Pid, [Data | Rest], Acc) ->
- Result = prompt(Pid, Data),
- prompt_many(Pid, Rest, [Result | Acc]).
% gen_server callbacks
init([]) ->
{ok, #evstate{ddocs=dict:new()}}.
diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl
index 3a267be21..db62d499e 100644
--- a/src/couchdb/couch_os_process.erl
+++ b/src/couchdb/couch_os_process.erl
@@ -14,7 +14,7 @@
-export([start_link/1, start_link/2, start_link/3, stop/1]).
--export([set_timeout/2, prompt/2, prompt_many/2]).
+-export([set_timeout/2, prompt/2]).
-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
@@ -57,40 +57,6 @@ prompt(Pid, Data) ->
-prompt_many(Pid, DataList) ->
- OsProc = gen_server:call(Pid, get_os_proc, infinity),
- true = port_connect(OsProc#os_proc.port, self()),
- try
- send_many(OsProc, DataList),
- receive_many(length(DataList), OsProc, [])
- after
- % Can throw badarg error, when OsProc Pid is dead or port was closed
- % by the readline function on error/timeout.
- (catch port_connect(OsProc#os_proc.port, Pid)),
- unlink(OsProc#os_proc.port),
- drop_port_messages(OsProc#os_proc.port)
- end.
-send_many(_OsProc, []) ->
- ok;
-send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
- Writer(OsProc, Data),
- send_many(OsProc, Rest).
-receive_many(0, _OsProc, Acc) ->
- {ok, lists:reverse(Acc)};
-receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
- Line = Reader(OsProc),
- receive_many(N - 1, OsProc, [Line | Acc]).
-drop_port_messages(Port) ->
- receive
- {Port, _} ->
- drop_port_messages(Port)
- after 0 ->
- ok
- end.
% Utility functions for reading and writing
% in custom functions
writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
@@ -209,8 +175,6 @@ terminate(_Reason, #os_proc{port=Port}) ->
catch port_close(Port),
-handle_call(get_os_proc, _From, OsProc) ->
- {reply, OsProc, OsProc};
handle_call({set_timeout, TimeOut}, _From, OsProc) ->
{reply, ok, OsProc#os_proc{timeout=TimeOut}};
handle_call({prompt, Data}, _From, OsProc) ->
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index e29f23b92..3b58cbe9c 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -16,7 +16,7 @@
-export([start_link/0, config_change/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
--export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
@@ -33,7 +33,6 @@
ddoc_keys = [],
- prompt_many_fun,
@@ -84,15 +83,10 @@ map_docs(Proc, Docs) ->
{ok, Results}.
-map_docs_raw(Proc, DocList) ->
- {Mod, Fun} = Proc#proc.prompt_many_fun,
- CommandList = lists:map(
- fun(Doc) ->
- EJson = couch_doc:to_json_obj(Doc, []),
- [<<"map_doc">>, EJson]
- end,
- DocList),
- Mod:Fun(, CommandList).
+map_doc_raw(Proc, Doc) ->
+ Json = couch_doc:to_json_obj(Doc, []),
+ {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
stop_doc_map(nil) ->
@@ -487,7 +481,6 @@ new_process(Langs, LangLimits, Lang) ->
% Called via proc_prompt, proc_set_timeout, and proc_stop
prompt_fun={Mod, prompt},
- prompt_many_fun={Mod, prompt_many},
set_timeout_fun={Mod, set_timeout},
stop_fun={Mod, stop}}};
_ ->