From 6ffc52796182a8d3673f3648c59c7e9abddf4fa2 Mon Sep 17 00:00:00 2001 From: Dave Cottlehuber Date: Wed, 12 Dec 2012 21:37:18 +0100 Subject: COUCHDB-1334 - revert "More efficient communication with the view server" This reverts commit a851c6e - COUCHDB-1334 breaks with Windows + couchjs in unexplained ways - reducing to 1 concurrent query server is not sufficient - Testing with open_port options overlapped_io was not in itself sufficient - http://erlang.org/doc/man/erlang.html find overlapped_io - Refer history in COUCHDB-1346 --- src/couch_mrview/src/couch_mrview_updater.erl | 46 +++++++++++---------------- src/couchdb/couch_native_process.erl | 11 +------ src/couchdb/couch_os_process.erl | 38 +--------------------- src/couchdb/couch_query_servers.erl | 17 +++------- 4 files changed, 25 insertions(+), 87 deletions(-) diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 3014664ea..9604ea954 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -130,42 +130,32 @@ map_docs(Parent, State0) -> couch_query_servers:stop_doc_map(State0#mrst.qserver), couch_work_queue:close(State0#mrst.write_queue); {ok, Dequeued} -> + % Run all the non deleted docs through the view engine and + % then pass the results on to the writer process. State1 = case State0#mrst.qserver of nil -> start_query_server(State0); _ -> State0 end, - {ok, MapResults} = compute_map_results(State1, Dequeued), - couch_work_queue:queue(State1#mrst.write_queue, MapResults), + QServer = State1#mrst.qserver, + DocFun = fun + ({nil, Seq, _}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), Results}; + ({Id, Seq, deleted}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), [{Id, []} | Results]}; + ({Id, Seq, Doc}, {SeqAcc, Results}) -> + {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), + {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} + end, + FoldFun = fun(Docs, Acc) -> + update_task(length(Docs)), + lists:foldl(DocFun, Acc, Docs) + end, + Results = lists:foldl(FoldFun, {0, []}, Dequeued), + couch_work_queue:queue(State1#mrst.write_queue, Results), map_docs(Parent, State1) end. -compute_map_results(#mrst{qserver = Qs}, Dequeued) -> - % Run all the non deleted docs through the view engine and - % then pass the results on to the writer process. - DocFun = fun - ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) -> - {erlang:max(Seq, SeqAcc), AccDel, AccNotDel}; - ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) -> - {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel}; - ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) -> - {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]} - end, - FoldFun = fun(Docs, Acc) -> - lists:foldl(DocFun, Acc, Docs) - end, - {MaxSeq, DeletedResults, Docs} = - lists:foldl(FoldFun, {0, [], []}, Dequeued), - {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs), - NotDeletedResults = lists:zipwith( - fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end, - Docs, - MapResultList), - AllMapResults = DeletedResults ++ NotDeletedResults, - update_task(length(AllMapResults)), - {ok, {MaxSeq, AllMapResults}}. - - write_results(Parent, State) -> case couch_work_queue:dequeue(State#mrst.write_queue) of closed -> diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl index b1d51ed39..5a32e75e7 100644 --- a/src/couchdb/couch_native_process.erl +++ b/src/couchdb/couch_native_process.erl @@ -42,7 +42,7 @@ -export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, handle_info/2]). --export([set_timeout/2, prompt/2, prompt_many/2]). +-export([set_timeout/2, prompt/2]). -define(STATE, native_proc_state). -record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}). @@ -62,15 +62,6 @@ set_timeout(Pid, TimeOut) -> prompt(Pid, Data) when is_list(Data) -> gen_server:call(Pid, {prompt, Data}). -prompt_many(Pid, DataList) -> - prompt_many(Pid, DataList, []). - -prompt_many(_Pid, [], Acc) -> - {ok, lists:reverse(Acc)}; -prompt_many(Pid, [Data | Rest], Acc) -> - Result = prompt(Pid, Data), - prompt_many(Pid, Rest, [Result | Acc]). - % gen_server callbacks init([]) -> {ok, #evstate{ddocs=dict:new()}}. diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl index 3a267be21..db62d499e 100644 --- a/src/couchdb/couch_os_process.erl +++ b/src/couchdb/couch_os_process.erl @@ -14,7 +14,7 @@ -behaviour(gen_server). -export([start_link/1, start_link/2, start_link/3, stop/1]). --export([set_timeout/2, prompt/2, prompt_many/2]). +-export([set_timeout/2, prompt/2]). -export([send/2, writeline/2, readline/1, writejson/2, readjson/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). @@ -57,40 +57,6 @@ prompt(Pid, Data) -> throw(Error) end. -prompt_many(Pid, DataList) -> - OsProc = gen_server:call(Pid, get_os_proc, infinity), - true = port_connect(OsProc#os_proc.port, self()), - try - send_many(OsProc, DataList), - receive_many(length(DataList), OsProc, []) - after - % Can throw badarg error, when OsProc Pid is dead or port was closed - % by the readline function on error/timeout. - (catch port_connect(OsProc#os_proc.port, Pid)), - unlink(OsProc#os_proc.port), - drop_port_messages(OsProc#os_proc.port) - end. - -send_many(_OsProc, []) -> - ok; -send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) -> - Writer(OsProc, Data), - send_many(OsProc, Rest). - -receive_many(0, _OsProc, Acc) -> - {ok, lists:reverse(Acc)}; -receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) -> - Line = Reader(OsProc), - receive_many(N - 1, OsProc, [Line | Acc]). - -drop_port_messages(Port) -> - receive - {Port, _} -> - drop_port_messages(Port) - after 0 -> - ok - end. - % Utility functions for reading and writing % in custom functions writeline(OsProc, Data) when is_record(OsProc, os_proc) -> @@ -209,8 +175,6 @@ terminate(_Reason, #os_proc{port=Port}) -> catch port_close(Port), ok. -handle_call(get_os_proc, _From, OsProc) -> - {reply, OsProc, OsProc}; handle_call({set_timeout, TimeOut}, _From, OsProc) -> {reply, ok, OsProc#os_proc{timeout=TimeOut}}; handle_call({prompt, Data}, _From, OsProc) -> diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index e29f23b92..3b58cbe9c 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -16,7 +16,7 @@ -export([start_link/0, config_change/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). --export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]). +-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). -export([filter_view/3]). @@ -33,7 +33,6 @@ lang, ddoc_keys = [], prompt_fun, - prompt_many_fun, set_timeout_fun, stop_fun }). @@ -84,15 +83,10 @@ map_docs(Proc, Docs) -> Docs), {ok, Results}. -map_docs_raw(Proc, DocList) -> - {Mod, Fun} = Proc#proc.prompt_many_fun, - CommandList = lists:map( - fun(Doc) -> - EJson = couch_doc:to_json_obj(Doc, []), - [<<"map_doc">>, EJson] - end, - DocList), - Mod:Fun(Proc#proc.pid, CommandList). +map_doc_raw(Proc, Doc) -> + Json = couch_doc:to_json_obj(Doc, []), + {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}. + stop_doc_map(nil) -> ok; @@ -487,7 +481,6 @@ new_process(Langs, LangLimits, Lang) -> pid=Pid, % Called via proc_prompt, proc_set_timeout, and proc_stop prompt_fun={Mod, prompt}, - prompt_many_fun={Mod, prompt_many}, set_timeout_fun={Mod, set_timeout}, stop_fun={Mod, stop}}}; _ -> -- cgit v1.2.1