summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-04-18 03:54:09 -0400
committerNick Vatamaniuc <vatamane@apache.org>2017-04-28 17:35:50 -0400
commit6df8cf62e411f525573b1e39db68f80ef6507f6b (patch)
tree81eac8fa75568acb0d51ae4f7a99961d7062de71
parent4841774575fb5771a245e5f046a26eaa7ac8dbb4 (diff)
downloadcouchdb-63012-scheduler.tar.gz
Add `_scheduler/{jobs,docs}` API endpoints63012-scheduler
The `_scheduler/jobs` endpoint provides a view of all replications managed by the scheduler. This endpoint includes more information on the replication than the `_scheduler/docs` endpoint, including the history of state transitions of the replication. This part was implemented by Benjamin Bastian. The `_scheduler/docs` endpoint provides a view of all replicator docs which have been seen by the scheduler. This endpoint includes useful information such as the state of the replication and the coordinator node. The implemention of `_scheduler/docs` mimics closely `_all_docs` behavior: similar pagination, HTTP request processing and fabric / rexi setup. The algorithm is roughly as follows: * http endpoint: - parses query args like it does for any view query - parses states to filter by, states are kept in the `extra` query arg * Call is made to couch_replicator_fabric. This is equivalent to fabric:all_docs. Here the typical fabric / rexi setup is happening. * Fabric worker is in `couch_replicator_fabric_rpc:docs/3`. This worker is similar to fabric_rpc's all_docs handler. However it is a bit more intricate to handle both replication document in terminal state as well as those which are active. - Before emitting it queries the state of the document to see if it is in a terminal state. If it is, it filters it and decides if it should be emitted or not. - If the document state cannot be found from the document. It tries to fetch active state from local node's doc processor via key based lookup. If it finds, it can also filter it based on state and emit it or skip. - If the document cannot be found in the node's local doc processor ETS table, the row is emitted with a doc value of `undecided`. This will let the coordinator fetch the state by possibly querying other nodes's doc processors. * Coordinator then starts handling messages. This also mostly mimics all_docs. At this point the most interesting thing is handling `undecided` docs. If one is found, then `replicator:active_doc/2` is queried. There, all nodes where document shards live are queries. This is better than a previous implementation where all nodes were queries all the time. * The final work happens in `couch_replicator_httpd` where the emitting callback is. There we only the doc is emitted (not keys, rows, values). Another thing that happens is the `Total` value is decremented to account for the always-present _design doc. Because of this a bunch of stuff was removed. Including an extra view which was build and managed by the previous implementation. As a bonus, other view-related parameters such as skip and limit seems to work out of the box and don't have to be implemented ad-hoc. Also, most importantly many thanks to Paul Davis for suggesting this approach. Jira: COUCHDB-3324
-rw-r--r--src/chttpd/src/chttpd_httpd_handlers.erl1
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric.erl155
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric_rpc.erl97
-rw-r--r--src/couch_replicator/src/couch_replicator_httpd.erl77
-rw-r--r--src/couch_replicator/src/couch_replicator_httpd_util.erl201
5 files changed, 517 insertions, 14 deletions
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index b91aae9cc..9c3044126 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,6 +19,7 @@ url_handler(<<"favicon.ico">>) -> fun chttpd_misc:handle_favicon_req/1;
url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1;
url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1;
url_handler(<<"_node">>) -> fun chttpd_misc:handle_node_req/1;
url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
url_handler(<<"_replicate">>) -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl
new file mode 100644
index 000000000..6998b2803
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric).
+
+-export([
+ docs/5
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+docs(DbName, Options, QueryArgs, Callback, Acc) ->
+ Shards = mem3:shards(DbName),
+ Workers0 = fabric_util:submit_jobs(
+ Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
+ RexiMon = fabric_util:create_monitors(Workers0),
+ try
+ case fabric_util:stream_start(Workers0, #shard.ref) of
+ {ok, Workers} ->
+ try
+ docs_int(DbName, Workers, QueryArgs, Callback, Acc)
+ after
+ fabric_util:cleanup(Workers)
+ end;
+ {timeout, NewState} ->
+ DefunctWorkers = fabric_util:remove_done_workers(
+ NewState#stream_acc.workers, waiting
+ ),
+ fabric_util:log_timeout(
+ DefunctWorkers,
+ "replicator docs"
+ ),
+ Callback({error, timeout}, Acc);
+ {error, Error} ->
+ Callback({error, Error}, Acc)
+ end
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+
+docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
+ #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+ State = #collector{
+ db_name = DbName,
+ query_args = QueryArgs,
+ callback = Callback,
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ user_acc = Acc0,
+ update_seq = nil
+ },
+ case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ {timeout, NewState} ->
+ Callback({error, timeout}, NewState#collector.user_acc);
+ {error, Resp} ->
+ {ok, Resp}
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ fabric_view:check_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({meta, Meta0}, {Worker, From}, State) ->
+ Tot = couch_util:get_value(total, Meta0, 0),
+ Off = couch_util:get_value(offset, Meta0, 0),
+ #collector{
+ callback = Callback,
+ counters = Counters0,
+ total_rows = Total0,
+ offset = Offset0,
+ user_acc = AccIn
+ } = State,
+ % Assert that we don't have other messages from this
+ % worker when the total_and_offset message arrives.
+ 0 = fabric_dict:lookup_element(Worker, Counters0),
+ rexi:stream_ack(From),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters1) of
+ true ->
+ {ok, State#collector{
+ counters = Counters1,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ Meta = [{total, Total}, {offset, FinalOffset}],
+ {Go, Acc} = Callback({meta, Meta}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters1),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
+ end;
+
+handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
+ #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
+ case maybe_fetch_and_filter_doc(Id, Doc, State) of
+ {[_ | _]} = NewDoc ->
+ Row = Row0#view_row{doc = NewDoc},
+ Dir = Args#mrargs.direction,
+ Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ State1 = State#collector{rows=Rows, counters=Counters1},
+ fabric_view:maybe_send_row(State1);
+ skip ->
+ rexi:stream_ack(From),
+ {ok, State}
+ end;
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+
+merge_row(fwd, Row, Rows) ->
+ lists:keymerge(#view_row.id, [Row], Rows);
+merge_row(rev, Row, Rows) ->
+ lists:rkeymerge(#view_row.id, [Row], Rows).
+
+
+maybe_fetch_and_filter_doc(Id, undecided, State) ->
+ #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
+ FilterStates = proplists:get_value(filter_states, Extra),
+ case couch_replicator:active_doc(DbName, Id) of
+ {ok, {Props} = DocInfo} ->
+ DocState = couch_util:get_value(state, Props),
+ couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo);
+ {error, not_found} ->
+ skip % could have been deleted
+ end;
+maybe_fetch_and_filter_doc(_Id, Doc, _State) ->
+ Doc.
diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
new file mode 100644
index 000000000..d67f87548
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric_rpc).
+
+-export([
+ docs/3
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+docs(DbName, Options, Args0) ->
+ set_io_priority(DbName, Options),
+ #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0,
+ FilterStates = proplists:get_value(filter_states, Extra),
+ Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
+ HealthThreshold = couch_replicator_scheduler:health_threshold(),
+ {ok, Db} = couch_db:open_int(DbName, Options),
+ Acc = {DbName, FilterStates, HealthThreshold},
+ couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).
+
+
+docs_cb({meta, Meta}, Acc) ->
+ ok = rexi:stream2({meta, Meta}),
+ {ok, Acc};
+docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
+ Id = couch_util:get_value(id, Row),
+ Doc = couch_util:get_value(doc, Row),
+ ViewRow = #view_row{
+ id = Id,
+ key = couch_util:get_value(key, Row),
+ value = couch_util:get_value(value, Row)
+ },
+ case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of
+ skip ->
+ ok;
+ Other ->
+ ok = rexi:stream2(ViewRow#view_row{doc = Other})
+ end,
+ {ok, Acc};
+docs_cb(complete, Acc) ->
+ ok = rexi:stream_last(complete),
+ {ok, Acc}.
+
+
+set_io_priority(DbName, Options) ->
+ case lists:keyfind(io_priority, 1, Options) of
+ {io_priority, Pri} ->
+ erlang:put(io_priority, Pri);
+ false ->
+ erlang:put(io_priority, {interactive, DbName})
+ end.
+
+
+%% Get the state of the replication document. If it is found and has a terminal
+%% state then it can be filtered and either included in the results or skipped.
+%% If it is not in a terminal state, look it up in the local doc processor ETS
+%% table. If it is there then filter by state. If it is not found there either
+%% then mark it as `undecided` and let the coordinator try to fetch it. The
+%% The idea is to do as much work as possible locally and leave the minimum
+%% amount of work for the coordinator.
+rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) ->
+ skip;
+rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) ->
+ DbName = mem3:dbname(Shard),
+ DocInfo = couch_replicator:info_from_doc(DbName, Doc),
+ case get_doc_state(DocInfo) of
+ null ->
+ % Fetch from local doc processor. If there, filter by state.
+ % If not there, mark as undecided. Let coordinator figure it out.
+ case couch_replicator_doc_processor:doc_lookup(Shard, Id,
+ HealthThreshold) of
+ {ok, EtsInfo} ->
+ State = get_doc_state(EtsInfo),
+ couch_replicator_utils:filter_state(State, States, EtsInfo);
+ {error, not_found} ->
+ undecided
+ end;
+ OtherState when is_atom(OtherState) ->
+ couch_replicator_utils:filter_state(OtherState, States, DocInfo)
+ end.
+
+
+get_doc_state({Props})->
+ couch_util:get_value(state, Props).
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index 6b4347267..364d09858 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -13,6 +13,12 @@
-module(couch_replicator_httpd).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+ handle_req/1,
+ handle_scheduler_req/1
+]).
-import(couch_httpd, [
send_json/2,
@@ -24,13 +30,68 @@
to_binary/1
]).
--export([handle_req/1]).
+
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
+
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+ Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit",
+ ?DEFAULT_TASK_LIMIT, 0, infinity),
+ Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0,
+ infinity),
+ {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+ Flatlist = lists:concat(Replies),
+ % couch_replicator_scheduler:job_ejson/1 guarantees {id, Id} to be the
+ % the first item in the list
+ Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+ Total = length(Sorted),
+ Offset = min(Skip, Total),
+ Sublist = lists:sublist(Sorted, Offset+1, Limit),
+ Sublist1 = [couch_replicator_httpd_util:update_db_name(Task)
+ || Task <- Sublist],
+ send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+ case couch_replicator:job(JobId) of
+ {ok, JobInfo} ->
+ send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo));
+ {error, not_found} ->
+ throw(not_found)
+ end;
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+ VArgs0 = couch_mrview_http:parse_params(Req, undefined),
+ StatesQs = chttpd:qs_value(Req, "states"),
+ States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
+ VArgs1 = VArgs0#mrargs{
+ view_type = map,
+ include_docs = true,
+ reduce = false,
+ extra = [{filter_states, States}]
+ },
+ VArgs2 = couch_mrview_util:validate_args(VArgs1),
+ Opts = [{user_ctx, Req#httpd.user_ctx}],
+ Db = ?REPDB,
+ Max = chttpd:chunked_response_buffer_size(),
+ Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max),
+ Cb = fun couch_replicator_httpd_util:docs_cb/2,
+ {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
+ {ok, couch_replicator_httpd_util:docs_acc_response(RAcc)};
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
+ UserCtx = Req#httpd.user_ctx,
+ case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+ {ok, DocInfo} ->
+ send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo));
+ {error, not_found} ->
+ throw(not_found)
+ end;
+handle_scheduler_req(Req) ->
+ send_method_not_allowed(Req, "GET,HEAD").
handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
RepDoc = {Props} = couch_httpd:json_body_obj(Req),
- validate_rep_props(Props),
+ couch_replicator_httpd_utils:validate_rep_props(Props),
case couch_replicator:replicate(RepDoc, UserCtx) of
{error, {Error, Reason}} ->
send_json(
@@ -51,15 +112,3 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
handle_req(Req) ->
send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
- ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
- lists:foreach(fun
- ({_,V}) when is_binary(V) -> ok;
- ({K,_}) -> throw({bad_request,
- <<K/binary," value must be a string.">>})
- end, Params),
- validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
- validate_rep_props(Rest).
diff --git a/src/couch_replicator/src/couch_replicator_httpd_util.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl
new file mode 100644
index 000000000..624eddd2f
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_httpd_util.erl
@@ -0,0 +1,201 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpd_util).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+ validate_rep_props/1,
+ parse_int_param/5,
+ parse_replication_state_filter/1,
+ update_db_name/1,
+ docs_acc_new/3,
+ docs_acc_response/1,
+ docs_cb/2
+]).
+
+-import(couch_httpd, [
+ send_json/2,
+ send_json/3,
+ send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+ to_binary/1
+]).
+
+
+parse_replication_state_filter(undefined) ->
+ []; % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+ AllStates = couch_replicator:replication_states(),
+ StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+ AtomStates = try
+ [list_to_existing_atom(S) || S <- StrStates]
+ catch error:badarg ->
+ Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+ throw({query_parse_error, ?l2b(Msg1)})
+ end,
+ AllSet = sets:from_list(AllStates),
+ StatesSet = sets:from_list(AtomStates),
+ Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+ case Diff of
+ [] ->
+ AtomStates;
+ _ ->
+ Args = [Diff, AllStates],
+ Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+ throw({query_parse_error, ?l2b(Msg2)})
+ end.
+
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+ IntVal = try
+ list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+ catch error:badarg ->
+ Msg1 = io_lib:format("~s must be an integer", [Param]),
+ throw({query_parse_error, ?l2b(Msg1)})
+ end,
+ case IntVal >= Min andalso IntVal =< Max of
+ true ->
+ IntVal;
+ false ->
+ Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+ throw({query_parse_error, ?l2b(Msg2)})
+ end.
+
+
+validate_rep_props([]) ->
+ ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+ lists:foreach(fun
+ ({_,V}) when is_binary(V) -> ok;
+ ({K,_}) -> throw({bad_request,
+ <<K/binary," value must be a string.">>})
+ end, Params),
+ validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+ validate_rep_props(Rest).
+
+
+prepend_val(#vacc{prepend=Prepend}) ->
+ case Prepend of
+ undefined ->
+ "";
+ _ ->
+ Prepend
+ end.
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+ when Size > 0 andalso (Size + Len) > Max ->
+ #vacc{buffer = Buffer, resp = Resp} = Acc,
+ {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+ {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+ #vacc{buffer = Buf, bufsize = Size} = Acc0,
+ Acc = Acc0#vacc{
+ prepend = ",\r\n",
+ buffer = [Buf | Data],
+ bufsize = Size + Len
+ },
+ {ok, Acc}.
+
+docs_acc_new(Req, Db, Threshold) ->
+ #vacc{db=Db, req=Req, threshold=Threshold}.
+
+docs_acc_response(#vacc{resp = Resp}) ->
+ Resp.
+
+docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
+ {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
+ {ok, Acc#vacc{resp=Resp}};
+
+docs_cb(complete, #vacc{resp=undefined}=Acc) ->
+ % Nothing in view
+ {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+ {ok, Acc#vacc{resp=Resp}};
+
+docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
+ %% Start response
+ Headers = [],
+ {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
+ docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
+
+docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
+ {ok, Acc#vacc{resp=Resp1}};
+
+docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
+ % Finish view output and possibly end the response
+ {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
+ case Acc#vacc.should_close of
+ true ->
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, Acc#vacc{resp=Resp2}};
+ _ ->
+ {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
+ prepend=",\r\n", buffer=[], bufsize=0}}
+ end;
+
+docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
+ % Sending metadata as we've not sent it or any row yet
+ Parts = case couch_util:get_value(total, Meta) of
+ undefined -> [];
+ Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])]
+ end ++ case couch_util:get_value(offset, Meta) of
+ undefined -> [];
+ Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+ end ++ ["\"docs\":["],
+ Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
+ {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
+ {ok, AccOut#vacc{prepend="", meta_sent=true}};
+
+
+docs_cb({meta, _Meta}, #vacc{}=Acc) ->
+ %% ignore metadata
+ {ok, Acc};
+
+docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
+ %% sorted=false and row arrived before meta
+ % Adding another row
+ Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
+
+docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
+ % Adding another row
+ Chunk = [prepend_val(Acc), row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+
+
+update_db_name({Props}) ->
+ {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+ {[{database, normalize_db_name(DbName)} | Props1]}.
+
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+ mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+ DbName.
+
+row_to_json(Row) ->
+ Doc0 = couch_util:get_value(doc, Row),
+ Doc1 = update_db_name(Doc0),
+ ?JSON_ENCODE(Doc1).
+
+
+%% Adjust Total as there is an automatically created validation design doc
+adjust_total(Total) when is_integer(Total), Total > 0 ->
+ Total - 1;
+adjust_total(Total) when is_integer(Total) ->
+ 0.