diff options
-rw-r--r-- | src/chttpd/src/chttpd_httpd_handlers.erl | 1 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_fabric.erl | 155 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_fabric_rpc.erl | 97 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_httpd.erl | 77 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_httpd_util.erl | 201 |
5 files changed, 517 insertions, 14 deletions
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index b91aae9cc..9c3044126 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -19,6 +19,7 @@ url_handler(<<"favicon.ico">>) -> fun chttpd_misc:handle_favicon_req/1; url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1; url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1; url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1; +url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1; url_handler(<<"_node">>) -> fun chttpd_misc:handle_node_req/1; url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1; url_handler(<<"_replicate">>) -> fun chttpd_misc:handle_replicate_req/1; diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl new file mode 100644 index 000000000..6998b2803 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_fabric.erl @@ -0,0 +1,155 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_fabric). + +-export([ + docs/5 +]). + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +docs(DbName, Options, QueryArgs, Callback, Acc) -> + Shards = mem3:shards(DbName), + Workers0 = fabric_util:submit_jobs( + Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]), + RexiMon = fabric_util:create_monitors(Workers0), + try + case fabric_util:stream_start(Workers0, #shard.ref) of + {ok, Workers} -> + try + docs_int(DbName, Workers, QueryArgs, Callback, Acc) + after + fabric_util:cleanup(Workers) + end; + {timeout, NewState} -> + DefunctWorkers = fabric_util:remove_done_workers( + NewState#stream_acc.workers, waiting + ), + fabric_util:log_timeout( + DefunctWorkers, + "replicator docs" + ), + Callback({error, timeout}, Acc); + {error, Error} -> + Callback({error, Error}, Acc) + end + after + rexi_monitor:stop(RexiMon) + end. + + +docs_int(DbName, Workers, QueryArgs, Callback, Acc0) -> + #mrargs{limit = Limit, skip = Skip} = QueryArgs, + State = #collector{ + db_name = DbName, + query_args = QueryArgs, + callback = Callback, + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + user_acc = Acc0, + update_seq = nil + }, + case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + {timeout, NewState} -> + Callback({error, timeout}, NewState#collector.user_acc); + {error, Resp} -> + {ok, Resp} + end. + +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> + fabric_view:check_down_shards(State, NodeRef); + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + fabric_view:handle_worker_exit(State, Worker, Reason); + +handle_message({meta, Meta0}, {Worker, From}, State) -> + Tot = couch_util:get_value(total, Meta0, 0), + Off = couch_util:get_value(offset, Meta0, 0), + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + % Assert that we don't have other messages from this + % worker when the total_and_offset message arrives. + 0 = fabric_dict:lookup_element(Worker, Counters0), + rexi:stream_ack(From), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case fabric_dict:any(0, Counters1) of + true -> + {ok, State#collector{ + counters = Counters1, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + Meta = [{total, Total}, {offset, FinalOffset}], + {Go, Acc} = Callback({meta, Meta}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters1), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end; + +handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + case maybe_fetch_and_filter_doc(Id, Doc, State) of + {[_ | _]} = NewDoc -> + Row = Row0#view_row{doc = NewDoc}, + Dir = Args#mrargs.direction, + Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + fabric_view:maybe_send_row(State1); + skip -> + rexi:stream_ack(From), + {ok, State} + end; + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). + + +merge_row(fwd, Row, Rows) -> + lists:keymerge(#view_row.id, [Row], Rows); +merge_row(rev, Row, Rows) -> + lists:rkeymerge(#view_row.id, [Row], Rows). + + +maybe_fetch_and_filter_doc(Id, undecided, State) -> + #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State, + FilterStates = proplists:get_value(filter_states, Extra), + case couch_replicator:active_doc(DbName, Id) of + {ok, {Props} = DocInfo} -> + DocState = couch_util:get_value(state, Props), + couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo); + {error, not_found} -> + skip % could have been deleted + end; +maybe_fetch_and_filter_doc(_Id, Doc, _State) -> + Doc. diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl new file mode 100644 index 000000000..d67f87548 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl @@ -0,0 +1,97 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_fabric_rpc). + +-export([ + docs/3 +]). + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + + +docs(DbName, Options, Args0) -> + set_io_priority(DbName, Options), + #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0, + FilterStates = proplists:get_value(filter_states, Extra), + Args = Args0#mrargs{skip = 0, limit = Skip + Limit}, + HealthThreshold = couch_replicator_scheduler:health_threshold(), + {ok, Db} = couch_db:open_int(DbName, Options), + Acc = {DbName, FilterStates, HealthThreshold}, + couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc). + + +docs_cb({meta, Meta}, Acc) -> + ok = rexi:stream2({meta, Meta}), + {ok, Acc}; +docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) -> + Id = couch_util:get_value(id, Row), + Doc = couch_util:get_value(doc, Row), + ViewRow = #view_row{ + id = Id, + key = couch_util:get_value(key, Row), + value = couch_util:get_value(value, Row) + }, + case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of + skip -> + ok; + Other -> + ok = rexi:stream2(ViewRow#view_row{doc = Other}) + end, + {ok, Acc}; +docs_cb(complete, Acc) -> + ok = rexi:stream_last(complete), + {ok, Acc}. + + +set_io_priority(DbName, Options) -> + case lists:keyfind(io_priority, 1, Options) of + {io_priority, Pri} -> + erlang:put(io_priority, Pri); + false -> + erlang:put(io_priority, {interactive, DbName}) + end. + + +%% Get the state of the replication document. If it is found and has a terminal +%% state then it can be filtered and either included in the results or skipped. +%% If it is not in a terminal state, look it up in the local doc processor ETS +%% table. If it is there then filter by state. If it is not found there either +%% then mark it as `undecided` and let the coordinator try to fetch it. The +%% The idea is to do as much work as possible locally and leave the minimum +%% amount of work for the coordinator. +rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) -> + skip; +rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) -> + DbName = mem3:dbname(Shard), + DocInfo = couch_replicator:info_from_doc(DbName, Doc), + case get_doc_state(DocInfo) of + null -> + % Fetch from local doc processor. If there, filter by state. + % If not there, mark as undecided. Let coordinator figure it out. + case couch_replicator_doc_processor:doc_lookup(Shard, Id, + HealthThreshold) of + {ok, EtsInfo} -> + State = get_doc_state(EtsInfo), + couch_replicator_utils:filter_state(State, States, EtsInfo); + {error, not_found} -> + undecided + end; + OtherState when is_atom(OtherState) -> + couch_replicator_utils:filter_state(OtherState, States, DocInfo) + end. + + +get_doc_state({Props})-> + couch_util:get_value(state, Props). diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl index 6b4347267..364d09858 100644 --- a/src/couch_replicator/src/couch_replicator_httpd.erl +++ b/src/couch_replicator/src/couch_replicator_httpd.erl @@ -13,6 +13,12 @@ -module(couch_replicator_httpd). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-export([ + handle_req/1, + handle_scheduler_req/1 +]). -import(couch_httpd, [ send_json/2, @@ -24,13 +30,68 @@ to_binary/1 ]). --export([handle_req/1]). + +-define(DEFAULT_TASK_LIMIT, 100). +-define(REPDB, <<"_replicator">>). + + +handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) -> + Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit", + ?DEFAULT_TASK_LIMIT, 0, infinity), + Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0, + infinity), + {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []), + Flatlist = lists:concat(Replies), + % couch_replicator_scheduler:job_ejson/1 guarantees {id, Id} to be the + % the first item in the list + Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist), + Total = length(Sorted), + Offset = min(Skip, Total), + Sublist = lists:sublist(Sorted, Offset+1, Limit), + Sublist1 = [couch_replicator_httpd_util:update_db_name(Task) + || Task <- Sublist], + send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]}); +handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) -> + case couch_replicator:job(JobId) of + {ok, JobInfo} -> + send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo)); + {error, not_found} -> + throw(not_found) + end; +handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) -> + VArgs0 = couch_mrview_http:parse_params(Req, undefined), + StatesQs = chttpd:qs_value(Req, "states"), + States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs), + VArgs1 = VArgs0#mrargs{ + view_type = map, + include_docs = true, + reduce = false, + extra = [{filter_states, States}] + }, + VArgs2 = couch_mrview_util:validate_args(VArgs1), + Opts = [{user_ctx, Req#httpd.user_ctx}], + Db = ?REPDB, + Max = chttpd:chunked_response_buffer_size(), + Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max), + Cb = fun couch_replicator_httpd_util:docs_cb/2, + {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc), + {ok, couch_replicator_httpd_util:docs_acc_response(RAcc)}; +handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) -> + UserCtx = Req#httpd.user_ctx, + case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of + {ok, DocInfo} -> + send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo)); + {error, not_found} -> + throw(not_found) + end; +handle_scheduler_req(Req) -> + send_method_not_allowed(Req, "GET,HEAD"). handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) -> couch_httpd:validate_ctype(Req, "application/json"), RepDoc = {Props} = couch_httpd:json_body_obj(Req), - validate_rep_props(Props), + couch_replicator_httpd_utils:validate_rep_props(Props), case couch_replicator:replicate(RepDoc, UserCtx) of {error, {Error, Reason}} -> send_json( @@ -51,15 +112,3 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) -> handle_req(Req) -> send_method_not_allowed(Req, "POST"). - -validate_rep_props([]) -> - ok; -validate_rep_props([{<<"query_params">>, {Params}}|Rest]) -> - lists:foreach(fun - ({_,V}) when is_binary(V) -> ok; - ({K,_}) -> throw({bad_request, - <<K/binary," value must be a string.">>}) - end, Params), - validate_rep_props(Rest); -validate_rep_props([_|Rest]) -> - validate_rep_props(Rest). diff --git a/src/couch_replicator/src/couch_replicator_httpd_util.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl new file mode 100644 index 000000000..624eddd2f --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_httpd_util.erl @@ -0,0 +1,201 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_httpd_util). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-export([ + validate_rep_props/1, + parse_int_param/5, + parse_replication_state_filter/1, + update_db_name/1, + docs_acc_new/3, + docs_acc_response/1, + docs_cb/2 +]). + +-import(couch_httpd, [ + send_json/2, + send_json/3, + send_method_not_allowed/2 +]). + +-import(couch_util, [ + to_binary/1 +]). + + +parse_replication_state_filter(undefined) -> + []; % This is the default (wildcard) filter +parse_replication_state_filter(States) when is_list(States) -> + AllStates = couch_replicator:replication_states(), + StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")], + AtomStates = try + [list_to_existing_atom(S) || S <- StrStates] + catch error:badarg -> + Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]), + throw({query_parse_error, ?l2b(Msg1)}) + end, + AllSet = sets:from_list(AllStates), + StatesSet = sets:from_list(AtomStates), + Diff = sets:to_list(sets:subtract(StatesSet, AllSet)), + case Diff of + [] -> + AtomStates; + _ -> + Args = [Diff, AllStates], + Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args), + throw({query_parse_error, ?l2b(Msg2)}) + end. + + +parse_int_param(Req, Param, Default, Min, Max) -> + IntVal = try + list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default))) + catch error:badarg -> + Msg1 = io_lib:format("~s must be an integer", [Param]), + throw({query_parse_error, ?l2b(Msg1)}) + end, + case IntVal >= Min andalso IntVal =< Max of + true -> + IntVal; + false -> + Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]), + throw({query_parse_error, ?l2b(Msg2)}) + end. + + +validate_rep_props([]) -> + ok; +validate_rep_props([{<<"query_params">>, {Params}}|Rest]) -> + lists:foreach(fun + ({_,V}) when is_binary(V) -> ok; + ({K,_}) -> throw({bad_request, + <<K/binary," value must be a string.">>}) + end, Params), + validate_rep_props(Rest); +validate_rep_props([_|Rest]) -> + validate_rep_props(Rest). + + +prepend_val(#vacc{prepend=Prepend}) -> + case Prepend of + undefined -> + ""; + _ -> + Prepend + end. + + +maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len) + when Size > 0 andalso (Size + Len) > Max -> + #vacc{buffer = Buffer, resp = Resp} = Acc, + {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer), + {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}}; +maybe_flush_response(Acc0, Data, Len) -> + #vacc{buffer = Buf, bufsize = Size} = Acc0, + Acc = Acc0#vacc{ + prepend = ",\r\n", + buffer = [Buf | Data], + bufsize = Size + Len + }, + {ok, Acc}. + +docs_acc_new(Req, Db, Threshold) -> + #vacc{db=Db, req=Req, threshold=Threshold}. + +docs_acc_response(#vacc{resp = Resp}) -> + Resp. + +docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) -> + {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason), + {ok, Acc#vacc{resp=Resp}}; + +docs_cb(complete, #vacc{resp=undefined}=Acc) -> + % Nothing in view + {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}), + {ok, Acc#vacc{resp=Resp}}; + +docs_cb(Msg, #vacc{resp=undefined}=Acc) -> + %% Start response + Headers = [], + {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers), + docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true}); + +docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) -> + {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason), + {ok, Acc#vacc{resp=Resp1}}; + +docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) -> + % Finish view output and possibly end the response + {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max), + case Acc#vacc.should_close of + true -> + {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), + {ok, Acc#vacc{resp=Resp2}}; + _ -> + {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false, + prepend=",\r\n", buffer=[], bufsize=0}} + end; + +docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) -> + % Sending metadata as we've not sent it or any row yet + Parts = case couch_util:get_value(total, Meta) of + undefined -> []; + Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])] + end ++ case couch_util:get_value(offset, Meta) of + undefined -> []; + Offset -> [io_lib:format("\"offset\":~p", [Offset])] + end ++ ["\"docs\":["], + Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"], + {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)), + {ok, AccOut#vacc{prepend="", meta_sent=true}}; + + +docs_cb({meta, _Meta}, #vacc{}=Acc) -> + %% ignore metadata + {ok, Acc}; + +docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) -> + %% sorted=false and row arrived before meta + % Adding another row + Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)], + maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk)); + +docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) -> + % Adding another row + Chunk = [prepend_val(Acc), row_to_json(Row)], + maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)). + + +update_db_name({Props}) -> + {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props), + {[{database, normalize_db_name(DbName)} | Props1]}. + +normalize_db_name(<<"shards/", _/binary>> = DbName) -> + mem3:dbname(DbName); +normalize_db_name(DbName) -> + DbName. + +row_to_json(Row) -> + Doc0 = couch_util:get_value(doc, Row), + Doc1 = update_db_name(Doc0), + ?JSON_ENCODE(Doc1). + + +%% Adjust Total as there is an automatically created validation design doc +adjust_total(Total) when is_integer(Total), Total > 0 -> + Total - 1; +adjust_total(Total) when is_integer(Total) -> + 0. |