summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/chttpd/src/chttpd_httpd_handlers.erl1
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric.erl155
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric_rpc.erl97
-rw-r--r--src/couch_replicator/src/couch_replicator_httpd.erl77
-rw-r--r--src/couch_replicator/src/couch_replicator_httpd_util.erl201
5 files changed, 517 insertions, 14 deletions
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index b91aae9cc..9c3044126 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,6 +19,7 @@ url_handler(<<"favicon.ico">>) -> fun chttpd_misc:handle_favicon_req/1;
url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1;
url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1;
url_handler(<<"_node">>) -> fun chttpd_misc:handle_node_req/1;
url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
url_handler(<<"_replicate">>) -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl
new file mode 100644
index 000000000..6998b2803
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric).
+
+-export([
+ docs/5
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+docs(DbName, Options, QueryArgs, Callback, Acc) ->
+ Shards = mem3:shards(DbName),
+ Workers0 = fabric_util:submit_jobs(
+ Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
+ RexiMon = fabric_util:create_monitors(Workers0),
+ try
+ case fabric_util:stream_start(Workers0, #shard.ref) of
+ {ok, Workers} ->
+ try
+ docs_int(DbName, Workers, QueryArgs, Callback, Acc)
+ after
+ fabric_util:cleanup(Workers)
+ end;
+ {timeout, NewState} ->
+ DefunctWorkers = fabric_util:remove_done_workers(
+ NewState#stream_acc.workers, waiting
+ ),
+ fabric_util:log_timeout(
+ DefunctWorkers,
+ "replicator docs"
+ ),
+ Callback({error, timeout}, Acc);
+ {error, Error} ->
+ Callback({error, Error}, Acc)
+ end
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+
+docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
+ #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+ State = #collector{
+ db_name = DbName,
+ query_args = QueryArgs,
+ callback = Callback,
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ user_acc = Acc0,
+ update_seq = nil
+ },
+ case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ {timeout, NewState} ->
+ Callback({error, timeout}, NewState#collector.user_acc);
+ {error, Resp} ->
+ {ok, Resp}
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ fabric_view:check_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({meta, Meta0}, {Worker, From}, State) ->
+ Tot = couch_util:get_value(total, Meta0, 0),
+ Off = couch_util:get_value(offset, Meta0, 0),
+ #collector{
+ callback = Callback,
+ counters = Counters0,
+ total_rows = Total0,
+ offset = Offset0,
+ user_acc = AccIn
+ } = State,
+ % Assert that we don't have other messages from this
+ % worker when the total_and_offset message arrives.
+ 0 = fabric_dict:lookup_element(Worker, Counters0),
+ rexi:stream_ack(From),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters1) of
+ true ->
+ {ok, State#collector{
+ counters = Counters1,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ Meta = [{total, Total}, {offset, FinalOffset}],
+ {Go, Acc} = Callback({meta, Meta}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters1),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
+ end;
+
+handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
+ #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
+ case maybe_fetch_and_filter_doc(Id, Doc, State) of
+ {[_ | _]} = NewDoc ->
+ Row = Row0#view_row{doc = NewDoc},
+ Dir = Args#mrargs.direction,
+ Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ State1 = State#collector{rows=Rows, counters=Counters1},
+ fabric_view:maybe_send_row(State1);
+ skip ->
+ rexi:stream_ack(From),
+ {ok, State}
+ end;
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+
+merge_row(fwd, Row, Rows) ->
+ lists:keymerge(#view_row.id, [Row], Rows);
+merge_row(rev, Row, Rows) ->
+ lists:rkeymerge(#view_row.id, [Row], Rows).
+
+
+maybe_fetch_and_filter_doc(Id, undecided, State) ->
+ #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
+ FilterStates = proplists:get_value(filter_states, Extra),
+ case couch_replicator:active_doc(DbName, Id) of
+ {ok, {Props} = DocInfo} ->
+ DocState = couch_util:get_value(state, Props),
+ couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo);
+ {error, not_found} ->
+ skip % could have been deleted
+ end;
+maybe_fetch_and_filter_doc(_Id, Doc, _State) ->
+ Doc.
diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
new file mode 100644
index 000000000..d67f87548
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric_rpc).
+
+-export([
+ docs/3
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+docs(DbName, Options, Args0) ->
+ set_io_priority(DbName, Options),
+ #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0,
+ FilterStates = proplists:get_value(filter_states, Extra),
+ Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
+ HealthThreshold = couch_replicator_scheduler:health_threshold(),
+ {ok, Db} = couch_db:open_int(DbName, Options),
+ Acc = {DbName, FilterStates, HealthThreshold},
+ couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).
+
+
+docs_cb({meta, Meta}, Acc) ->
+ ok = rexi:stream2({meta, Meta}),
+ {ok, Acc};
+docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
+ Id = couch_util:get_value(id, Row),
+ Doc = couch_util:get_value(doc, Row),
+ ViewRow = #view_row{
+ id = Id,
+ key = couch_util:get_value(key, Row),
+ value = couch_util:get_value(value, Row)
+ },
+ case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of
+ skip ->
+ ok;
+ Other ->
+ ok = rexi:stream2(ViewRow#view_row{doc = Other})
+ end,
+ {ok, Acc};
+docs_cb(complete, Acc) ->
+ ok = rexi:stream_last(complete),
+ {ok, Acc}.
+
+
+set_io_priority(DbName, Options) ->
+ case lists:keyfind(io_priority, 1, Options) of
+ {io_priority, Pri} ->
+ erlang:put(io_priority, Pri);
+ false ->
+ erlang:put(io_priority, {interactive, DbName})
+ end.
+
+
+%% Get the state of the replication document. If it is found and has a terminal
+%% state then it can be filtered and either included in the results or skipped.
+%% If it is not in a terminal state, look it up in the local doc processor ETS
+%% table. If it is there then filter by state. If it is not found there either
+%% then mark it as `undecided` and let the coordinator try to fetch it. The
+%% The idea is to do as much work as possible locally and leave the minimum
+%% amount of work for the coordinator.
+rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) ->
+ skip;
+rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) ->
+ DbName = mem3:dbname(Shard),
+ DocInfo = couch_replicator:info_from_doc(DbName, Doc),
+ case get_doc_state(DocInfo) of
+ null ->
+ % Fetch from local doc processor. If there, filter by state.
+ % If not there, mark as undecided. Let coordinator figure it out.
+ case couch_replicator_doc_processor:doc_lookup(Shard, Id,
+ HealthThreshold) of
+ {ok, EtsInfo} ->
+ State = get_doc_state(EtsInfo),
+ couch_replicator_utils:filter_state(State, States, EtsInfo);
+ {error, not_found} ->
+ undecided
+ end;
+ OtherState when is_atom(OtherState) ->
+ couch_replicator_utils:filter_state(OtherState, States, DocInfo)
+ end.
+
+
+get_doc_state({Props})->
+ couch_util:get_value(state, Props).
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index 6b4347267..364d09858 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -13,6 +13,12 @@
-module(couch_replicator_httpd).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+ handle_req/1,
+ handle_scheduler_req/1
+]).
-import(couch_httpd, [
send_json/2,
@@ -24,13 +30,68 @@
to_binary/1
]).
--export([handle_req/1]).
+
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
+
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+ Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit",
+ ?DEFAULT_TASK_LIMIT, 0, infinity),
+ Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0,
+ infinity),
+ {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+ Flatlist = lists:concat(Replies),
+ % couch_replicator_scheduler:job_ejson/1 guarantees {id, Id} to be the
+ % the first item in the list
+ Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+ Total = length(Sorted),
+ Offset = min(Skip, Total),
+ Sublist = lists:sublist(Sorted, Offset+1, Limit),
+ Sublist1 = [couch_replicator_httpd_util:update_db_name(Task)
+ || Task <- Sublist],
+ send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+ case couch_replicator:job(JobId) of
+ {ok, JobInfo} ->
+ send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo));
+ {error, not_found} ->
+ throw(not_found)
+ end;
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+ VArgs0 = couch_mrview_http:parse_params(Req, undefined),
+ StatesQs = chttpd:qs_value(Req, "states"),
+ States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
+ VArgs1 = VArgs0#mrargs{
+ view_type = map,
+ include_docs = true,
+ reduce = false,
+ extra = [{filter_states, States}]
+ },
+ VArgs2 = couch_mrview_util:validate_args(VArgs1),
+ Opts = [{user_ctx, Req#httpd.user_ctx}],
+ Db = ?REPDB,
+ Max = chttpd:chunked_response_buffer_size(),
+ Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max),
+ Cb = fun couch_replicator_httpd_util:docs_cb/2,
+ {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
+ {ok, couch_replicator_httpd_util:docs_acc_response(RAcc)};
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
+ UserCtx = Req#httpd.user_ctx,
+ case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+ {ok, DocInfo} ->
+ send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo));
+ {error, not_found} ->
+ throw(not_found)
+ end;
+handle_scheduler_req(Req) ->
+ send_method_not_allowed(Req, "GET,HEAD").
handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
RepDoc = {Props} = couch_httpd:json_body_obj(Req),
- validate_rep_props(Props),
+ couch_replicator_httpd_utils:validate_rep_props(Props),
case couch_replicator:replicate(RepDoc, UserCtx) of
{error, {Error, Reason}} ->
send_json(
@@ -51,15 +112,3 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
handle_req(Req) ->
send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
- ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
- lists:foreach(fun
- ({_,V}) when is_binary(V) -> ok;
- ({K,_}) -> throw({bad_request,
- <<K/binary," value must be a string.">>})
- end, Params),
- validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
- validate_rep_props(Rest).
diff --git a/src/couch_replicator/src/couch_replicator_httpd_util.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl
new file mode 100644
index 000000000..624eddd2f
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_httpd_util.erl
@@ -0,0 +1,201 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpd_util).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+ validate_rep_props/1,
+ parse_int_param/5,
+ parse_replication_state_filter/1,
+ update_db_name/1,
+ docs_acc_new/3,
+ docs_acc_response/1,
+ docs_cb/2
+]).
+
+-import(couch_httpd, [
+ send_json/2,
+ send_json/3,
+ send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+ to_binary/1
+]).
+
+
+parse_replication_state_filter(undefined) ->
+ []; % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+ AllStates = couch_replicator:replication_states(),
+ StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+ AtomStates = try
+ [list_to_existing_atom(S) || S <- StrStates]
+ catch error:badarg ->
+ Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+ throw({query_parse_error, ?l2b(Msg1)})
+ end,
+ AllSet = sets:from_list(AllStates),
+ StatesSet = sets:from_list(AtomStates),
+ Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+ case Diff of
+ [] ->
+ AtomStates;
+ _ ->
+ Args = [Diff, AllStates],
+ Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+ throw({query_parse_error, ?l2b(Msg2)})
+ end.
+
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+ IntVal = try
+ list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+ catch error:badarg ->
+ Msg1 = io_lib:format("~s must be an integer", [Param]),
+ throw({query_parse_error, ?l2b(Msg1)})
+ end,
+ case IntVal >= Min andalso IntVal =< Max of
+ true ->
+ IntVal;
+ false ->
+ Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+ throw({query_parse_error, ?l2b(Msg2)})
+ end.
+
+
+validate_rep_props([]) ->
+ ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+ lists:foreach(fun
+ ({_,V}) when is_binary(V) -> ok;
+ ({K,_}) -> throw({bad_request,
+ <<K/binary," value must be a string.">>})
+ end, Params),
+ validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+ validate_rep_props(Rest).
+
+
+prepend_val(#vacc{prepend=Prepend}) ->
+ case Prepend of
+ undefined ->
+ "";
+ _ ->
+ Prepend
+ end.
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+ when Size > 0 andalso (Size + Len) > Max ->
+ #vacc{buffer = Buffer, resp = Resp} = Acc,
+ {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+ {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+ #vacc{buffer = Buf, bufsize = Size} = Acc0,
+ Acc = Acc0#vacc{
+ prepend = ",\r\n",
+ buffer = [Buf | Data],
+ bufsize = Size + Len
+ },
+ {ok, Acc}.
+
+docs_acc_new(Req, Db, Threshold) ->
+ #vacc{db=Db, req=Req, threshold=Threshold}.
+
+docs_acc_response(#vacc{resp = Resp}) ->
+ Resp.
+
+docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
+ {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
+ {ok, Acc#vacc{resp=Resp}};
+
+docs_cb(complete, #vacc{resp=undefined}=Acc) ->
+ % Nothing in view
+ {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+ {ok, Acc#vacc{resp=Resp}};
+
+docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
+ %% Start response
+ Headers = [],
+ {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
+ docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
+
+docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
+ {ok, Acc#vacc{resp=Resp1}};
+
+docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
+ % Finish view output and possibly end the response
+ {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
+ case Acc#vacc.should_close of
+ true ->
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, Acc#vacc{resp=Resp2}};
+ _ ->
+ {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
+ prepend=",\r\n", buffer=[], bufsize=0}}
+ end;
+
+docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
+ % Sending metadata as we've not sent it or any row yet
+ Parts = case couch_util:get_value(total, Meta) of
+ undefined -> [];
+ Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])]
+ end ++ case couch_util:get_value(offset, Meta) of
+ undefined -> [];
+ Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+ end ++ ["\"docs\":["],
+ Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
+ {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
+ {ok, AccOut#vacc{prepend="", meta_sent=true}};
+
+
+docs_cb({meta, _Meta}, #vacc{}=Acc) ->
+ %% ignore metadata
+ {ok, Acc};
+
+docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
+ %% sorted=false and row arrived before meta
+ % Adding another row
+ Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
+
+docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
+ % Adding another row
+ Chunk = [prepend_val(Acc), row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+
+
+update_db_name({Props}) ->
+ {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+ {[{database, normalize_db_name(DbName)} | Props1]}.
+
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+ mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+ DbName.
+
+row_to_json(Row) ->
+ Doc0 = couch_util:get_value(doc, Row),
+ Doc1 = update_db_name(Doc0),
+ ?JSON_ENCODE(Doc1).
+
+
+%% Adjust Total as there is an automatically created validation design doc
+adjust_total(Total) when is_integer(Total), Total > 0 ->
+ Total - 1;
+adjust_total(Total) when is_integer(Total) ->
+ 0.