diff options
Diffstat (limited to 'src/fabric/src/fabric_rpc.erl')
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 664 |
1 files changed, 0 insertions, 664 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl deleted file mode 100644 index 85da3ff12..000000000 --- a/src/fabric/src/fabric_rpc.erl +++ /dev/null @@ -1,664 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_rpc). - --export([get_db_info/1, get_doc_count/1, get_design_doc_count/1, - get_update_seq/1]). --export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3, - get_missing_revs/2, get_missing_revs/3, update_docs/3]). --export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/1, create_db/2, delete_db/1, reset_validation_funs/1, - set_security/3, set_revs_limit/3, create_shard_db_doc/2, - delete_shard_db_doc/2, get_partition_info/2]). --export([get_all_security/2, open_shard/2]). --export([compact/1, compact/2]). --export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]). - --export([get_db_info/2, get_doc_count/2, get_design_doc_count/2, - get_update_seq/2, changes/4, map_view/5, reduce_view/5, - group_info/3, update_mrview/4]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). - -%% rpc endpoints -%% call to with_db will supply your M:F with a Db instance -%% and then remaining args - -%% @equiv changes(DbName, Args, StartSeq, []) -changes(DbName, Args, StartSeq) -> - changes(DbName, Args, StartSeq, []). - -changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) -> - changes(DbName, [Args], StartSeq, DbOptions); -changes(DbName, Options, StartVector, DbOptions) -> - set_io_priority(DbName, DbOptions), - Args0 = lists:keyfind(changes_args, 1, Options), - #changes_args{dir=Dir, filter_fun=Filter} = Args0, - Args = case Filter of - {fetch, custom, Style, Req, {DDocId, Rev}, FName} -> - {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), - Args0#changes_args{ - filter_fun={custom, Style, Req, DDoc, FName} - }; - {fetch, view, Style, {DDocId, Rev}, VName} -> - {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), - Args0#changes_args{filter_fun={view, Style, DDoc, VName}}; - _ -> - Args0 - end, - - DbOpenOptions = Args#changes_args.db_open_options ++ DbOptions, - case get_or_create_db(DbName, DbOpenOptions) of - {ok, Db} -> - StartSeq = calculate_start_seq(Db, node(), StartVector), - Enum = fun changes_enumerator/2, - Opts = [{dir,Dir}], - Acc0 = #fabric_changes_acc{ - db = Db, - seq = StartSeq, - args = Args, - options = Options, - pending = couch_db:count_changes_since(Db, StartSeq), - epochs = couch_db:get_epochs(Db) - }, - try - {ok, #fabric_changes_acc{seq=LastSeq, pending=Pending, epochs=Epochs}} = - do_changes(Db, StartSeq, Enum, Acc0, Opts), - rexi:stream_last({complete, [ - {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}}, - {pending, Pending} - ]}) - after - couch_db:close(Db) - end; - Error -> - rexi:stream_last(Error) - end. - -do_changes(Db, StartSeq, Enum, Acc0, Opts) -> - #fabric_changes_acc { - args = Args - } = Acc0, - #changes_args { - filter = Filter - } = Args, - case Filter of - "_doc_ids" -> - % optimised code path, we’re looking up all doc_ids in the by-id instead of filtering - % the entire by-seq tree to find the doc_ids one by one - #changes_args { - filter_fun = {doc_ids, Style, DocIds}, - dir = Dir - } = Args, - couch_changes:send_changes_doc_ids(Db, StartSeq, Dir, Enum, Acc0, {doc_ids, Style, DocIds}); - "_design_docs" -> - % optimised code path, we’re looking up all design_docs in the by-id instead of - % filtering the entire by-seq tree to find the design_docs one by one - #changes_args { - filter_fun = {design_docs, Style}, - dir = Dir - } = Args, - couch_changes:send_changes_design_docs(Db, StartSeq, Dir, Enum, Acc0, {design_docs, Style}); - _ -> - couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts) - end. - -all_docs(DbName, Options, Args0) -> - case fabric_util:upgrade_mrargs(Args0) of - #mrargs{keys=undefined} = Args -> - set_io_priority(DbName, Options), - {ok, Db} = get_or_create_db(DbName, Options), - CB = get_view_cb(Args), - couch_mrview:query_all_docs(Db, Args, CB, Args) - end. - -update_mrview(DbName, {DDocId, Rev}, ViewName, Args0) -> - {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), - couch_util:with_db(DbName, fun(Db) -> - UpdateSeq = couch_db:get_update_seq(Db), - {ok, Pid, _} = couch_mrview:get_view_index_pid( - Db, DDoc, ViewName, fabric_util:upgrade_mrargs(Args0)), - couch_index:get_state(Pid, UpdateSeq) - end). - -%% @equiv map_view(DbName, DDoc, ViewName, Args0, []) -map_view(DbName, DDocInfo, ViewName, Args0) -> - map_view(DbName, DDocInfo, ViewName, Args0, []). - -map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> - {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), - map_view(DbName, DDoc, ViewName, Args0, DbOptions); -map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> - set_io_priority(DbName, DbOptions), - Args = fabric_util:upgrade_mrargs(Args0), - {ok, Db} = get_or_create_db(DbName, DbOptions), - CB = get_view_cb(Args), - couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args). - -%% @equiv reduce_view(DbName, DDoc, ViewName, Args0) -reduce_view(DbName, DDocInfo, ViewName, Args0) -> - reduce_view(DbName, DDocInfo, ViewName, Args0, []). - -reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> - {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), - reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); -reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> - set_io_priority(DbName, DbOptions), - Args = fabric_util:upgrade_mrargs(Args0), - {ok, Db} = get_or_create_db(DbName, DbOptions), - VAcc0 = #vacc{db=Db}, - couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). - -create_db(DbName) -> - create_db(DbName, []). - -create_db(DbName, Options) -> - rexi:reply(case couch_server:create(DbName, Options) of - {ok, _} -> - ok; - Error -> - Error - end). - -create_shard_db_doc(_, Doc) -> - rexi:reply(mem3_util:write_db_doc(Doc)). - -delete_db(DbName) -> - couch_server:delete(DbName, []). - -delete_shard_db_doc(_, DocId) -> - rexi:reply(mem3_util:delete_db_doc(DocId)). - -%% @equiv get_db_info(DbName, []) -get_db_info(DbName) -> - get_db_info(DbName, []). - -get_db_info(DbName, DbOptions) -> - with_db(DbName, DbOptions, {couch_db, get_db_info, []}). - -get_partition_info(DbName, Partition) -> - with_db(DbName, [], {couch_db, get_partition_info, [Partition]}). - -%% equiv get_doc_count(DbName, []) -get_doc_count(DbName) -> - get_doc_count(DbName, []). - -get_doc_count(DbName, DbOptions) -> - with_db(DbName, DbOptions, {couch_db, get_doc_count, []}). - -%% equiv get_design_doc_count(DbName, []) -get_design_doc_count(DbName) -> - get_design_doc_count(DbName, []). - -get_design_doc_count(DbName, DbOptions) -> - with_db(DbName, DbOptions, {couch_db, get_design_doc_count, []}). - -%% equiv get_update_seq(DbName, []) -get_update_seq(DbName) -> - get_update_seq(DbName, []). - -get_update_seq(DbName, DbOptions) -> - with_db(DbName, DbOptions, {couch_db, get_update_seq, []}). - -set_security(DbName, SecObj, Options0) -> - Options = case lists:keyfind(io_priority, 1, Options0) of - false -> - [{io_priority, {db_meta, security}}|Options0]; - _ -> - Options0 - end, - with_db(DbName, Options, {couch_db, set_security, [SecObj]}). - -get_all_security(DbName, Options) -> - with_db(DbName, Options, {couch_db, get_security, []}). - -set_revs_limit(DbName, Limit, Options) -> - with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). - -set_purge_infos_limit(DbName, Limit, Options) -> - with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}). - -open_doc(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). - -open_revs(DbName, Id, Revs, Options) -> - with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). - -get_full_doc_info(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, get_full_doc_info, [DocId]}). - -get_doc_info(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, get_doc_info, [DocId]}). - -get_missing_revs(DbName, IdRevsList) -> - get_missing_revs(DbName, IdRevsList, []). - -get_missing_revs(DbName, IdRevsList, Options) -> - % reimplement here so we get [] for Ids with no missing revs in response - set_io_priority(DbName, Options), - rexi:reply(case get_or_create_db(DbName, Options) of - {ok, Db} -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - #full_doc_info{rev_tree=RevisionTree} = FullInfo -> - MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), - {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; - not_found -> - {Id, Revs, []} - end - end, IdRevsList, couch_db:get_full_doc_infos(Db, Ids))}; - Error -> - Error - end). - -update_docs(DbName, Docs0, Options) -> - {Docs1, Type} = case couch_util:get_value(read_repair, Options) of - NodeRevs when is_list(NodeRevs) -> - Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options), - {Filtered, replicated_changes}; - undefined -> - X = case proplists:get_value(replicated_changes, Options) of - true -> replicated_changes; - _ -> interactive_edit - end, - {Docs0, X} - end, - Docs2 = make_att_readers(Docs1), - with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}). - - -get_purge_seq(DbName, Options) -> - with_db(DbName, Options, {couch_db, get_purge_seq, []}). - -purge_docs(DbName, UUIdsIdsRevs, Options) -> - with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}). - -%% @equiv group_info(DbName, DDocId, []) -group_info(DbName, DDocId) -> - group_info(DbName, DDocId, []). - -group_info(DbName, DDocId, DbOptions) -> - with_db(DbName, DbOptions, {couch_mrview, get_info, [DDocId]}). - -reset_validation_funs(DbName) -> - case get_or_create_db(DbName, []) of - {ok, Db} -> - couch_db:reload_validation_funs(Db); - _ -> - ok - end. - -open_shard(Name, Opts) -> - set_io_priority(Name, Opts), - try - rexi:reply(couch_db:open(Name, Opts)) - catch exit:{timeout, _} -> - couch_stats:increment_counter([fabric, open_shard, timeouts]) - end. - -compact(DbName) -> - with_db(DbName, [], {couch_db, start_compact, []}). - -compact(ShardName, DesignName) -> - {ok, Pid} = couch_index_server:get_index( - couch_mrview_index, ShardName, <<"_design/", DesignName/binary>>), - Ref = erlang:make_ref(), - Pid ! {'$gen_call', {self(), Ref}, compact}. - -%% -%% internal -%% - -with_db(DbName, Options, {M,F,A}) -> - set_io_priority(DbName, Options), - case get_or_create_db(DbName, Options) of - {ok, Db} -> - rexi:reply(try - apply(M, F, [Db | A]) - catch Exception -> - Exception; - error:Reason -> - couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason, - clean_stack()]), - {error, Reason} - end); - Error -> - rexi:reply(Error) - end. - - -read_repair_filter(DbName, Docs, NodeRevs, Options) -> - set_io_priority(DbName, Options), - case get_or_create_db(DbName, Options) of - {ok, Db} -> - try - read_repair_filter(Db, Docs, NodeRevs) - after - couch_db:close(Db) - end; - Error -> - rexi:reply(Error) - end. - - -% A read repair operation may have been triggered by a node -% that was out of sync with the local node. Thus, any time -% we receive a read repair request we need to check if we -% may have recently purged any of the given revisions and -% ignore them if so. -% -% This is accomplished by looking at the purge infos that we -% have locally that have not been replicated to the remote -% node. The logic here is that we may have received the purge -% request before the remote shard copy. So to check that we -% need to look at the purge infos that we have locally but -% have not yet sent to the remote copy. -% -% NodeRevs is a list of the {node(), [rev()]} tuples passed -% as the read_repair option to update_docs. -read_repair_filter(Db, Docs, NodeRevs) -> - [#doc{id = DocId} | _] = Docs, - NonLocalNodeRevs = [NR || {N, _} = NR <- NodeRevs, N /= node()], - Nodes = lists:usort([Node || {Node, _} <- NonLocalNodeRevs]), - NodeSeqs = get_node_seqs(Db, Nodes), - - DbPSeq = couch_db:get_purge_seq(Db), - Lag = config:get_integer("couchdb", "read_repair_lag", 100), - - % Filter out read-repair updates from any node that is - % so out of date that it would force us to scan a large - % number of purge infos - NodeFiltFun = fun({Node, _Revs}) -> - {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs), - NodeSeq >= DbPSeq - Lag - end, - RecentNodeRevs = lists:filter(NodeFiltFun, NonLocalNodeRevs), - - % For each node we scan the purge infos to filter out any - % revisions that have been locally purged since we last - % replicated to the remote node's shard copy. - AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) -> - {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs), - FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) -> - if PDocId /= DocId -> {ok, InnerAcc}; true -> - {ok, InnerAcc -- PRevs} - end - end, - {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs), - lists:usort(FiltRevs ++ RevAcc) - end, [], RecentNodeRevs), - - % Finally, filter the doc updates to only include revisions - % that have not been purged locally. - DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) -> - lists:member({Pos, Rev}, AllowableRevs) - end, - lists:filter(DocFiltFun, Docs). - - -get_node_seqs(Db, Nodes) -> - % Gather the list of {Node, PurgeSeq} pairs for all nodes - % that are present in our read repair group - FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> - case Id of - <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> -> - TgtNode = couch_util:get_value(<<"target_node">>, Props), - PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props), - case lists:keyfind(TgtNode, 1, Acc) of - {_, OldSeq} -> - NewSeq = erlang:max(OldSeq, PurgeSeq), - NewEntry = {TgtNode, NewSeq}, - NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry), - {ok, NewAcc}; - false -> - {ok, Acc} - end; - _ -> - % We've processed all _local mem3 purge docs - {stop, Acc} - end - end, - InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes], - Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}], - {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts), - [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs]. - - - -get_or_create_db(DbName, Options) -> - mem3_util:get_or_create_db(DbName, Options). - - -get_view_cb(#mrargs{extra = Options}) -> - case couch_util:get_value(callback, Options) of - {Mod, Fun} when is_atom(Mod), is_atom(Fun) -> - fun Mod:Fun/2; - _ -> - fun view_cb/2 - end; -get_view_cb(_) -> - fun view_cb/2. - - -view_cb({meta, Meta}, Acc) -> - % Map function starting - ok = rexi:stream2({meta, Meta}), - {ok, Acc}; -view_cb({row, Row}, Acc) -> - % Adding another row - ViewRow = #view_row{ - id = couch_util:get_value(id, Row), - key = couch_util:get_value(key, Row), - value = couch_util:get_value(value, Row), - doc = couch_util:get_value(doc, Row) - }, - ok = rexi:stream2(ViewRow), - {ok, Acc}; -view_cb(complete, Acc) -> - % Finish view output - ok = rexi:stream_last(complete), - {ok, Acc}; -view_cb(ok, ddoc_updated) -> - rexi:reply({ok, ddoc_updated}). - - -reduce_cb({meta, Meta}, Acc) -> - % Map function starting - ok = rexi:stream2({meta, Meta}), - {ok, Acc}; -reduce_cb({row, Row}, Acc) -> - % Adding another row - ok = rexi:stream2(#view_row{ - key = couch_util:get_value(key, Row), - value = couch_util:get_value(value, Row) - }), - {ok, Acc}; -reduce_cb(complete, Acc) -> - % Finish view output - ok = rexi:stream_last(complete), - {ok, Acc}; -reduce_cb(ok, ddoc_updated) -> - rexi:reply({ok, ddoc_updated}). - - -changes_enumerator(#full_doc_info{} = FDI, Acc) -> - changes_enumerator(couch_doc:to_doc_info(FDI), Acc); -changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) -> - {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending-1}}; -changes_enumerator(DocInfo, Acc) -> - #fabric_changes_acc{ - db = Db, - args = #changes_args{ - include_docs = IncludeDocs, - conflicts = Conflicts, - filter_fun = Filter, - doc_options = DocOptions - }, - pending = Pending, - epochs = Epochs - } = Acc, - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo, - case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of - [] -> - ChangesRow = {no_pass, [ - {pending, Pending-1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} - ]}; - Results -> - Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end, - ChangesRow = {change, [ - {pending, Pending-1}, - {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, - {id, Id}, - {changes, Results}, - {deleted, Del} | - if IncludeDocs -> [doc_member(Db, DocInfo, Opts, Filter)]; true -> [] end - ]} - end, - ok = rexi:stream2(ChangesRow), - {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending-1}}. - -doc_member(Shard, DocInfo, Opts, Filter) -> - case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of - {ok, Doc} -> - {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}; - Error -> - Error - end. - -maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) - when Fields =/= nil -> - mango_fields:extract(couch_doc:to_json_obj(Doc, Opts), Fields); -maybe_filtered_json_doc(Doc, Opts, _Filter) -> - couch_doc:to_json_obj(Doc, Opts). - - -possible_ancestors(_FullInfo, []) -> - []; -possible_ancestors(FullInfo, MissingRevs) -> - #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), - LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], - % Find the revs that are possible parents of this rev - lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing - % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> - LeafPos < MissingPos end, MissingRevs) of - true -> - [{LeafPos, LeafRevId} | Acc]; - false -> - Acc - end - end, [], LeafRevs). - -make_att_readers([]) -> - []; -make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> - % % go through the attachments looking for 'follows' in the data, - % % replace with function that reads the data from MIME stream. - Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0], - [Doc#doc{atts = Atts} | make_att_readers(Rest)]. - -make_att_reader({follows, Parser, Ref}) -> - fun() -> - ParserRef = case get(mp_parser_ref) of - undefined -> - PRef = erlang:monitor(process, Parser), - put(mp_parser_ref, PRef), - PRef; - Else -> - Else - end, - Parser ! {get_bytes, Ref, self()}, - receive - {bytes, Ref, Bytes} -> - rexi:reply(attachment_chunk_received), - Bytes; - {'DOWN', ParserRef, _, _, Reason} -> - throw({mp_parser_died, Reason}) - end - end; -make_att_reader({fabric_attachment_receiver, Middleman, Length}) -> - fabric_doc_atts:receiver_callback(Middleman, Length); -make_att_reader(Else) -> - Else. - -clean_stack() -> - lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, - erlang:get_stacktrace()). - -set_io_priority(DbName, Options) -> - case lists:keyfind(io_priority, 1, Options) of - {io_priority, Pri} -> - erlang:put(io_priority, Pri); - false -> - erlang:put(io_priority, {interactive, DbName}) - end, - case erlang:get(io_priority) of - {interactive, _} -> - case config:get("couchdb", "maintenance_mode", "false") of - "true" -> - % Done to silence error logging by rexi_server - rexi:reply({rexi_EXIT, {maintenance_mode, node()}}), - exit(normal); - _ -> - ok - end; - _ -> - ok - end. - - -calculate_start_seq(Db, Node, Seq) -> - case couch_db:calculate_start_seq(Db, Node, Seq) of - N when is_integer(N) -> - N; - {replace, OriginalNode, Uuid, OriginalSeq} -> - %% Scan history looking for an entry with - %% * target_node == TargetNode - %% * target_uuid == TargetUUID - %% * target_seq =< TargetSeq - %% If such an entry is found, stream from associated source_seq - mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq) - end. - - -uuid(Db) -> - Uuid = couch_db:get_uuid(Db), - binary:part(Uuid, {0, uuid_prefix_len()}). - -uuid_prefix_len() -> - list_to_integer(config:get("fabric", "uuid_prefix_len", "7")). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -maybe_filtered_json_doc_no_filter_test() -> - Body = {[{<<"a">>, 1}]}, - Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, - {JDocProps} = maybe_filtered_json_doc(Doc, [], x), - ExpectedProps = [{<<"_id">>, <<"1">>}, {<<"_rev">>, <<"1-r1">>}, {<<"a">>, 1}], - ?assertEqual(lists:keysort(1, JDocProps), ExpectedProps). - -maybe_filtered_json_doc_with_filter_test() -> - Body = {[{<<"a">>, 1}]}, - Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, - Fields = [<<"a">>, <<"nonexistent">>], - Filter = {selector, main_only, {some_selector, Fields}}, - {JDocProps} = maybe_filtered_json_doc(Doc, [], Filter), - ?assertEqual(JDocProps, [{<<"a">>, 1}]). - --endif. |