summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2021-08-24 18:44:21 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2021-08-26 17:56:01 -0400
commite83935c7f8c3e47b47f07f22ece327f6529d4da0 (patch)
tree7530a947b7d4a4a87913d065fc14659f06f159e2 /src
parentc632dd35f466719ffe87d6820eb1866dd2ea8560 (diff)
downloadcouchdb-e83935c7f8c3e47b47f07f22ece327f6529d4da0.tar.gz
Avoid change feed rewinds after shard moves
When shards are moved to new nodes, and the user supplies a change sequence from the old shard map configuration, attempt to match missing nodes and ranges by inspecting current shard uuids in order to avoid rewinds. Previously, if a node and range was missing, we randomly picked a node in the appropriate range, so 1/3 of the time we might have hit the exact node, but 2/3 of the time we would end up with a complete changes feed rewind to 0. Unfortunately, this involves a fabric worker scatter gather operation to all shard copies. This should only happen when we get an old sequence. We rely on that happening rarely, mostly right after the shards moved, then users would get new sequence from the recent shard map.
Diffstat (limited to 'src')
-rw-r--r--src/fabric/src/fabric.erl7
-rw-r--r--src/fabric/src/fabric_db_uuids.erl67
-rw-r--r--src/fabric/src/fabric_ring.erl54
-rw-r--r--src/fabric/src/fabric_rpc.erl10
-rw-r--r--src/fabric/src/fabric_util.erl55
-rw-r--r--src/fabric/src/fabric_view_changes.erl57
-rw-r--r--src/fabric/test/eunit/fabric_db_uuids_tests.erl51
-rw-r--r--src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl122
8 files changed, 414 insertions, 9 deletions
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 34b967d23..638603437 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -37,7 +37,7 @@
% miscellany
-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
cleanup_index_files/1, cleanup_index_files_all_nodes/1, dbname/1,
- inactive_index_files/1]).
+ inactive_index_files/1, db_uuids/1]).
-include_lib("fabric/include/fabric.hrl").
@@ -551,6 +551,11 @@ dbname(Db) ->
erlang:error({illegal_database_name, Db})
end.
+%% @doc get db shard uuids
+-spec db_uuids(dbname()) -> map().
+db_uuids(DbName) ->
+ fabric_db_uuids:go(dbname(DbName)).
+
name(Thing) ->
couch_util:to_binary(Thing).
diff --git a/src/fabric/src/fabric_db_uuids.erl b/src/fabric/src/fabric_db_uuids.erl
new file mode 100644
index 000000000..a440d74c2
--- /dev/null
+++ b/src/fabric/src/fabric_db_uuids.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_uuids).
+
+
+-export([go/1]).
+
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+go(DbName) when is_binary(DbName) ->
+ Shards = mem3:live_shards(DbName, [node() | nodes()]),
+ Workers = fabric_util:submit_jobs(Shards, get_uuid, []),
+ RexiMon = fabric_util:create_monitors(Shards),
+ Acc0 = {fabric_dict:init(Workers, nil), []},
+ try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {timeout, {WorkersDict, _}} ->
+ DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+ fabric_util:log_timeout(DefunctWorkers, "db_uuids"),
+ {error, timeout};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+
+handle_message({rexi_DOWN, _, {_, NodeRef},_}, _Shard, {Cntrs, Res}) ->
+ case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
+ {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+ error -> {error, {nodedown, <<"progress not possible">>}}
+ end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
+ case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+ {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+ error -> {error, Reason}
+ end;
+
+handle_message(Uuid, Shard, {Cntrs, Res}) when is_binary(Uuid) ->
+ case fabric_ring:handle_response(Shard, Uuid, Cntrs, Res, [all]) of
+ {ok, {Cntrs1, Res1}} ->
+ {ok, {Cntrs1, Res1}};
+ {stop, Res1} ->
+ Uuids = fabric_dict:fold(fun(#shard{} = S, Id, #{} = Acc) ->
+ Acc#{Id => S#shard{ref = undefined}}
+ end, #{}, Res1),
+ {stop, Uuids}
+ end;
+
+handle_message(Reason, Shard, {Cntrs, Res}) ->
+ case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+ {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+ error -> {error, Reason}
+ end.
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 110edb9ab..bad0f42d1 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -122,6 +122,9 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) ->
% a partitioned database. As soon as a result from any of the shards
% arrives, result collection stops.
%
+% * When RingOpts is [all], responses are accepted until all the shards return
+% results
+%
handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
Workers1 = fabric_dict:erase(Shard, Workers),
case RingOpts of
@@ -130,7 +133,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
Responses1 = [{{B, E}, Shard, Response} | Responses],
handle_response_ring(Workers1, Responses1, CleanupCb);
[{any, Any}] ->
- handle_response_any(Shard, Response, Workers1, Any, CleanupCb)
+ handle_response_any(Shard, Response, Workers1, Any, CleanupCb);
+ [all] ->
+ Responses1 = [{Shard, Response} | Responses],
+ handle_response_all(Workers1, Responses1)
end.
@@ -164,6 +170,15 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
end.
+handle_response_all(Workers, Responses) ->
+ case fabric_dict:size(Workers) =:= 0 of
+ true ->
+ {stop, fabric_dict:from_list(Responses)};
+ false ->
+ {ok, {Workers, Responses}}
+ end.
+
+
% Check if workers still waiting and the already received responses could
% still form a continous range. The range won't always be the full ring, and
% the bounds are computed based on the minimum and maximum interval beginning
@@ -186,6 +201,9 @@ is_progress_possible(Counters, Responses, MinB, MaxE, []) ->
Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges,
mem3_util:get_ring(Ranges, MinB, MaxE) =/= [];
+is_progress_possible(Counters, _Responses, _, _, [all]) ->
+ fabric_dict:size(Counters) > 0;
+
is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) ->
InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end,
case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of
@@ -305,7 +323,7 @@ is_progress_possible_with_responses_test() ->
?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])).
-is_progress_possible_with_ring_opts_test() ->
+is_progress_possible_with_ring_opts_any_test() ->
Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}],
C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
RS1 = mk_resps([{"n1", 3, 10, 42}]),
@@ -323,6 +341,12 @@ is_progress_possible_with_ring_opts_test() ->
?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)).
+is_progress_possible_with_ring_opts_all_test() ->
+ C1 = [{mk_shard("n1", [0, ?RING_END]), nil}],
+ ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [all])),
+ ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, [all])).
+
+
get_shard_replacements_test() ->
Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [
{"n1", 11, 20}, {"n1", 21, ?RING_END},
@@ -422,7 +446,7 @@ handle_response_backtracking_test() ->
?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
-handle_response_ring_opts_test() ->
+handle_response_ring_opts_any_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n2", [0, 1]),
Shard3 = mk_shard("n3", [0, 1]),
@@ -446,6 +470,30 @@ handle_response_ring_opts_test() ->
?assertEqual({stop, [{Shard3, 44}]}, Result3).
+handle_response_ring_opts_all_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n2", [0, 1]),
+ Shard3 = mk_shard("n3", [0, 1]),
+
+ ShardList = [Shard1, Shard2, Shard3],
+ [W1, W2, W3] = WithRefs = [S#shard{ref = make_ref()} || S <- ShardList],
+ Workers1 = fabric_dict:init(WithRefs, nil),
+
+ Result1 = handle_response(W1, 42, Workers1, [], [all], undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, _}} = Result1,
+
+ % Even though n2 and n3 cover the same range, with 'all' option we wait for
+ % all workers to return.
+ Result2 = handle_response(W2, 43, Workers2, [], [all], undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, _}} = Result2,
+
+ % Stop only after all the shards respond
+ Result3 = handle_response(W3, 44, Workers3, [], [all], undefined),
+ ?assertMatch({stop, [_ | _]}, Result3).
+
+
handle_error_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n1", [10, ?RING_END]),
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 9ed8efd14..4330f92be 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -26,7 +26,7 @@
-export([get_db_info/2, get_doc_count/2, get_design_doc_count/2,
get_update_seq/2, changes/4, map_view/5, reduce_view/5,
- group_info/3, update_mrview/4]).
+ group_info/3, update_mrview/4, get_uuid/1]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("couch/include/couch_db.hrl").
@@ -318,6 +318,9 @@ compact(ShardName, DesignName) ->
Ref = erlang:make_ref(),
Pid ! {'$gen_call', {self(), Ref}, compact}.
+get_uuid(DbName) ->
+ with_db(DbName, [], {couch_db, get_uuid, []}).
+
%%
%% internal
%%
@@ -637,10 +640,9 @@ calculate_start_seq(Db, Node, Seq) ->
uuid(Db) ->
Uuid = couch_db:get_uuid(Db),
- binary:part(Uuid, {0, uuid_prefix_len()}).
+ Prefix = fabric_util:get_uuid_prefix_len(),
+ binary:part(Uuid, {0, Prefix}).
-uuid_prefix_len() ->
- list_to_integer(config:get("fabric", "uuid_prefix_len", "7")).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 84ffef122..9dd8e71fa 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -23,6 +23,9 @@
-export([validate_all_docs_args/2, validate_args/3]).
-export([upgrade_mrargs/1]).
-export([worker_ranges/1]).
+-export([get_uuid_prefix_len/0]).
+-export([isolate/1, isolate/2]).
+
-compile({inline, [{doc_id_and_rev,1}]}).
@@ -345,3 +348,55 @@ worker_ranges(Workers) ->
[{X, Y} | Acc]
end, [], Workers),
lists:usort(Ranges).
+
+
+get_uuid_prefix_len() ->
+ config:get_integer("fabric", "uuid_prefix_len", 7).
+
+
+% If we issue multiple fabric calls from the same process we have to isolate
+% them so in case of error they don't pollute the processes dictionary or the
+% mailbox
+
+isolate(Fun) ->
+ isolate(Fun, infinity).
+
+
+isolate(Fun, Timeout) ->
+ {Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end),
+ receive
+ {'DOWN', Ref, _, _, {'$isolres', Res}} ->
+ Res;
+ {'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} ->
+ erlang:raise(Tag, Reason, Stack)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ erlang:error(timeout)
+ end.
+
+
+% OTP_RELEASE is defined in OTP 21+ only
+-ifdef(OTP_RELEASE).
+
+
+do_isolate(Fun) ->
+ try
+ {'$isolres', Fun()}
+ catch Tag:Reason:Stack ->
+ {'$isolerr', Tag, Reason, Stack}
+ end.
+
+
+-else.
+
+
+do_isolate(Fun) ->
+ try
+ {'$isolres', Fun()}
+ catch ?STACKTRACE(Tag, Reason, Stack)
+ {'$isolerr', Tag, Reason, Stack}
+ end.
+
+
+-endif.
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index febbd3169..beeaecee1 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -453,12 +453,14 @@ do_unpack_seqs(Opaque, DbName) ->
true ->
Unpacked;
false ->
+ Uuids = get_db_uuid_shards(DbName),
PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) ->
case mem3:get_shard(DbName, Node, [A, B]) of
{ok, Shard} ->
{Shard, Seq};
{error, not_found} ->
- {#shard{node = Node, range = [A, B]}, Seq}
+ Shard = replace_moved_shard(Node, [A, B], Seq, Uuids),
+ {Shard, Seq}
end
end, Deduped),
Shards = mem3:shards(DbName),
@@ -495,6 +497,59 @@ get_old_seq(#shard{range=R}=Shard, SinceSeqs) ->
end.
+get_db_uuid_shards(DbName) ->
+ % Need to use an isolated process as we are performing a fabric call from
+ % another fabric call and there is a good chance we'd polute the mailbox
+ % with returned messages
+ Timeout = fabric_util:request_timeout(),
+ IsolatedFun = fun() -> fabric:db_uuids(DbName) end,
+ try fabric_util:isolate(IsolatedFun, Timeout) of
+ {ok, Uuids} ->
+ % Trim uuids so we match exactly based on the currently configured
+ % uuid_prefix_len. The assumption is that we are getting an older
+ % sequence from the same cluster and we didn't tweak that
+ % relatively obscure config option in the meantime.
+ PrefixLen = fabric_util:get_uuid_prefix_len(),
+ maps:fold(fun(Uuid, Shard, Acc) ->
+ TrimmedUuid = binary:part(Uuid, {0, PrefixLen}),
+ Acc#{TrimmedUuid => Shard}
+ end, #{}, Uuids);
+ {error, Error} ->
+ % Since we are doing a best-effort approach to match moved shards,
+ % tolerate and log errors. This should also handle cases when the
+ % cluster is partially upgraded, as some nodes will not have the
+ % newer get_uuid fabric_rpc handler.
+ ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p",
+ couch_log:error(ErrMsg, [?MODULE, DbName, Error]),
+ #{}
+ catch
+ _Tag:Error ->
+ ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p",
+ couch_log:error(ErrMsg, [?MODULE, DbName, Error]),
+ #{}
+ end.
+
+
+%% Determine if the missing shard moved to a new node. Do that by matching the
+%% uuids from the current shard map. If we cannot find a moved shard, we return
+%% the original node and range as a shard and hope for the best.
+replace_moved_shard(Node, Range, Seq, #{} = _UuidShards) when is_number(Seq) ->
+ % Cannot figure out shard moves without uuid matching
+ #shard{node = Node, range = Range};
+replace_moved_shard(Node, Range, {Seq, Uuid}, #{} = UuidShards) ->
+ % Compatibility case for an old seq format which didn't have epoch nodes
+ replace_moved_shard(Node, Range, {Seq, Uuid, Node}, UuidShards);
+replace_moved_shard(Node, Range, {_Seq, Uuid, _EpochNode}, #{} = UuidShards) ->
+ case UuidShards of
+ #{Uuid := #shard{range = Range} = Shard} ->
+ % Found a moved shard by matching both the uuid and the range
+ Shard;
+ #{} ->
+ % Did not find a moved shard, use the original node
+ #shard{node = Node, range = Range}
+ end.
+
+
changes_row(Props0) ->
Props1 = case couch_util:get_value(deleted, Props0) of
true ->
diff --git a/src/fabric/test/eunit/fabric_db_uuids_tests.erl b/src/fabric/test/eunit/fabric_db_uuids_tests.erl
new file mode 100644
index 000000000..986945b52
--- /dev/null
+++ b/src/fabric/test/eunit/fabric_db_uuids_tests.erl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_uuids_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(TDEF(A), {atom_to_list(A), fun A/0}).
+
+main_test_() ->
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF(t_can_get_shard_uuids)
+ ]
+ }.
+
+setup() ->
+ test_util:start_couch([fabric]).
+
+teardown(Ctx) ->
+ meck:unload(),
+ test_util:stop_couch(Ctx).
+
+t_can_get_shard_uuids() ->
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, []),
+ Shards = mem3:shards(DbName),
+ {ok, Uuids} = fabric:db_uuids(DbName),
+ ?assertEqual(length(Shards), map_size(Uuids)),
+ UuidsFromShards = lists:foldl(fun(#shard{} = Shard, Acc) ->
+ Uuid = couch_util:with_db(Shard#shard.name, fun(Db) ->
+ couch_db:get_uuid(Db)
+ end),
+ Acc#{Uuid => Shard}
+ end, #{}, Shards),
+ ?assertEqual(UuidsFromShards, Uuids),
+ ok = fabric:delete_db(DbName, []).
diff --git a/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl b/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl
new file mode 100644
index 000000000..a78d17ab7
--- /dev/null
+++ b/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl
@@ -0,0 +1,122 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_moved_shards_seq_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-define(TDEF(A), {atom_to_list(A), fun A/0}).
+
+
+main_test_() ->
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF(t_shard_moves_avoid_sequence_rewinds)
+ ]
+ }.
+
+
+setup() ->
+ test_util:start_couch([fabric]).
+
+
+teardown(Ctx) ->
+ meck:unload(),
+ test_util:stop_couch(Ctx).
+
+
+t_shard_moves_avoid_sequence_rewinds() ->
+ DocCnt = 30,
+ DbName = ?tempdb(),
+
+ ok = fabric:create_db(DbName, [{q,1}, {n,1}]),
+ lists:foreach(fun(I) ->
+ update_doc(DbName, #doc{id = erlang:integer_to_binary(I)})
+ end, lists:seq(1, DocCnt)),
+
+ {ok, _, Seq1, 0} = changes(DbName, #changes_args{limit = 1, since = "now"}),
+ [{_, Range, {Seq, Uuid, _}}] = seq_decode(Seq1),
+
+ % Transform Seq1 pretending it came from a fake source node, before the
+ % shard was moved to the current node.
+ SrcNode = 'srcnode@srchost',
+ Seq2 = seq_encode([{SrcNode, Range, {Seq, Uuid, SrcNode}}]),
+
+ % First, check when the shard file epoch is mismatched epoch and the
+ % sequence would rewind. This ensures the epoch and uuid check protection
+ % in couch_db works as intended.
+ Result1 = changes(DbName, #changes_args{limit = 1, since = Seq2}),
+ ?assertMatch({ok, _, _, _}, Result1),
+ {ok, _, _, PendingRewind} = Result1,
+ ?assertEqual(DocCnt - 1, PendingRewind),
+
+ % Mock epoch checking to pretend that shard actually used to live on
+ % SrcNode. In this case, we should not have rewinds.
+ mock_epochs([{node(), DocCnt}, {SrcNode, 1}]),
+ Result2 = changes(DbName, #changes_args{limit = 1, since = Seq2}),
+ ?assertMatch({ok, _, _, _}, Result2),
+ {ok, _, _, PendingNoRewind} = Result2,
+ ?assertEqual(0, PendingNoRewind),
+
+ ok = fabric:delete_db(DbName, []).
+
+
+changes_callback(start, Acc) ->
+ {ok, Acc};
+
+changes_callback({change, {Change}}, Acc) ->
+ CM = maps:from_list(Change),
+ {ok, [CM | Acc]};
+
+changes_callback({stop, EndSeq, Pending}, Acc) ->
+ {ok, Acc, EndSeq, Pending}.
+
+
+changes(DbName, #changes_args{} = Args) ->
+ fabric_util:isolate(fun() ->
+ fabric:changes(DbName, fun changes_callback/2, [], Args)
+ end).
+
+
+update_doc(DbName, #doc{} = Doc) ->
+ fabric_util:isolate(fun() ->
+ case fabric:update_doc(DbName, Doc, [?ADMIN_CTX]) of
+ {ok, Res} -> Res
+ end
+ end).
+
+
+seq_decode(Seq) ->
+ % This is copied from fabric_view_changes
+ Pattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
+ Options = [{capture, [opaque], binary}],
+ {match, Seq1} = re:run(Seq, Pattern, Options),
+ binary_to_term(couch_util:decodeBase64Url(Seq1)).
+
+
+seq_encode(Unpacked) ->
+ % Copied from fabric_view_changes
+ Opaque = couch_util:encodeBase64Url(term_to_binary(Unpacked, [compressed])),
+ ?l2b(["30", $-, Opaque]).
+
+
+mock_epochs(Epochs) ->
+ % Since we made up a node name we'll have to mock epoch checking
+ meck:new(couch_db_engine, [passthrough]),
+ meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end).