summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_rpc.erl')
-rw-r--r--src/fabric/src/fabric_rpc.erl49
1 files changed, 34 insertions, 15 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index c8aa19e0f..2b00a3668 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -32,15 +32,6 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
--record (cacc, {
- db,
- seq,
- args,
- options,
- pending,
- epochs
-}).
-
%% rpc endpoints
%% call to with_db will supply your M:F with a Db instance
%% and then remaining args
@@ -75,7 +66,7 @@ changes(DbName, Options, StartVector, DbOptions) ->
StartSeq = calculate_start_seq(Db, node(), StartVector),
Enum = fun changes_enumerator/2,
Opts = [{dir,Dir}],
- Acc0 = #cacc{
+ Acc0 = #fabric_changes_acc{
db = Db,
seq = StartSeq,
args = Args,
@@ -84,8 +75,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
epochs = couch_db:get_epochs(Db)
},
try
- {ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} =
- couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts),
+ {ok, #fabric_changes_acc{seq=LastSeq, pending=Pending, epochs=Epochs}} =
+ do_changes(Db, StartSeq, Enum, Acc0, Opts),
rexi:stream_last({complete, [
{seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}},
{pending, Pending}
@@ -97,6 +88,34 @@ changes(DbName, Options, StartVector, DbOptions) ->
rexi:stream_last(Error)
end.
+do_changes(Db, StartSeq, Enum, Acc0, Opts) ->
+ #fabric_changes_acc {
+ args = Args
+ } = Acc0,
+ #changes_args {
+ filter = Filter
+ } = Args,
+ case Filter of
+ "_doc_ids" ->
+ % optimised code path, we’re looking up all doc_ids in the by-id instead of filtering
+ % the entire by-seq tree to find the doc_ids one by one
+ #changes_args {
+ filter_fun = {doc_ids, Style, DocIds},
+ dir = Dir
+ } = Args,
+ couch_changes:send_changes_doc_ids(Db, StartSeq, Dir, Enum, Acc0, {doc_ids, Style, DocIds});
+ "_design_docs" ->
+ % optimised code path, we’re looking up all design_docs in the by-id instead of
+ % filtering the entire by-seq tree to find the design_docs one by one
+ #changes_args {
+ filter_fun = {design_docs, Style},
+ dir = Dir
+ } = Args,
+ couch_changes:send_changes_design_docs(Db, StartSeq, Dir, Enum, Acc0, {design_docs, Style});
+ _ ->
+ couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts)
+ end.
+
all_docs(DbName, Options, Args0) ->
case fabric_util:upgrade_mrargs(Args0) of
#mrargs{keys=undefined} = Args1 ->
@@ -482,9 +501,9 @@ reduce_cb(ok, ddoc_updated) ->
changes_enumerator(#full_doc_info{} = FDI, Acc) ->
changes_enumerator(couch_doc:to_doc_info(FDI), Acc);
changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
- {ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}};
+ {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending-1}};
changes_enumerator(DocInfo, Acc) ->
- #cacc{
+ #fabric_changes_acc{
db = Db,
args = #changes_args{
include_docs = IncludeDocs,
@@ -513,7 +532,7 @@ changes_enumerator(DocInfo, Acc) ->
]}
end,
ok = rexi:stream2(ChangesRow),
- {ok, Acc#cacc{seq = Seq, pending = Pending-1}}.
+ {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending-1}}.
doc_member(Shard, DocInfo, Opts, Filter) ->
case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of