From b8e648df5b67146c5fcccbcf0256a719dfdcd9a1 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Tue, 24 Aug 2021 18:44:21 -0400 Subject: Avoid change feed rewinds after shard moves When shards are moved to new nodes, and the user supplies a change sequence from the old shard map configuration, attempt to match missing nodes and ranges by inspecting current shard uuids in order to avoid rewinds. Previously, if a node and range was missing, we randomly picked a node in the appropriate range, so 1/3 of the time we might have hit the exact node, but 2/3 of the time we would end up with a complete changes feed rewind to 0. Unfortunately, this involves a fabric worker scatter gather operation to all shard copies. This should only happen when we get an old sequence. We rely on that happening rarely, mostly right after the shards moved, then users would get new sequence from the recent shard map. --- src/fabric/src/fabric.erl | 7 +- src/fabric/src/fabric_db_uuids.erl | 67 +++++++++++ src/fabric/src/fabric_ring.erl | 54 ++++++++- src/fabric/src/fabric_rpc.erl | 5 +- src/fabric/src/fabric_util.erl | 49 ++++++++ src/fabric/src/fabric_view_changes.erl | 57 +++++++++- src/fabric/test/eunit/fabric_db_uuids_tests.erl | 57 ++++++++++ .../test/eunit/fabric_moved_shards_seq_tests.erl | 123 +++++++++++++++++++++ 8 files changed, 413 insertions(+), 6 deletions(-) create mode 100644 src/fabric/src/fabric_db_uuids.erl create mode 100644 src/fabric/test/eunit/fabric_db_uuids_tests.erl create mode 100644 src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 34b967d23..638603437 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -37,7 +37,7 @@ % miscellany -export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, cleanup_index_files/1, cleanup_index_files_all_nodes/1, dbname/1, - inactive_index_files/1]). + inactive_index_files/1, db_uuids/1]). -include_lib("fabric/include/fabric.hrl"). @@ -551,6 +551,11 @@ dbname(Db) -> erlang:error({illegal_database_name, Db}) end. +%% @doc get db shard uuids +-spec db_uuids(dbname()) -> map(). +db_uuids(DbName) -> + fabric_db_uuids:go(dbname(DbName)). + name(Thing) -> couch_util:to_binary(Thing). diff --git a/src/fabric/src/fabric_db_uuids.erl b/src/fabric/src/fabric_db_uuids.erl new file mode 100644 index 000000000..8eb0aaed8 --- /dev/null +++ b/src/fabric/src/fabric_db_uuids.erl @@ -0,0 +1,67 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_uuids). + + +-export([go/1]). + + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +go(DbName) when is_binary(DbName) -> + Shards = mem3:live_shards(DbName, [node() | nodes()]), + Workers = fabric_util:submit_jobs(Shards, get_uuid, []), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), []}, + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {timeout, {WorkersDict, _}} -> + DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), + fabric_util:log_timeout(DefunctWorkers, "db_uuids"), + {error, timeout}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + + +handle_message({rexi_DOWN, _, {_, NodeRef},_}, _Shard, {Cntrs, Res}) -> + case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, {nodedown, <<"progress not possible">>}} + end; + +handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, Reason} + end; + +handle_message(Uuid, Shard, {Cntrs, Res}) when is_binary(Uuid) -> + case fabric_ring:handle_response(Shard, Uuid, Cntrs, Res, [all]) of + {ok, {Cntrs1, Res1}} -> + {ok, {Cntrs1, Res1}}; + {stop, Res1} -> + Uuids = fabric_dict:fold(fun(#shard{} = S, Id, #{} = Acc) -> + Acc#{Id => S#shard{ref = undefined}} + end, #{}, Res1), + {stop, Uuids} + end; + +handle_message(Reason, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, Reason} + end. diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl index 110edb9ab..bad0f42d1 100644 --- a/src/fabric/src/fabric_ring.erl +++ b/src/fabric/src/fabric_ring.erl @@ -122,6 +122,9 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) -> % a partitioned database. As soon as a result from any of the shards % arrives, result collection stops. % +% * When RingOpts is [all], responses are accepted until all the shards return +% results +% handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) -> Workers1 = fabric_dict:erase(Shard, Workers), case RingOpts of @@ -130,7 +133,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) -> Responses1 = [{{B, E}, Shard, Response} | Responses], handle_response_ring(Workers1, Responses1, CleanupCb); [{any, Any}] -> - handle_response_any(Shard, Response, Workers1, Any, CleanupCb) + handle_response_any(Shard, Response, Workers1, Any, CleanupCb); + [all] -> + Responses1 = [{Shard, Response} | Responses], + handle_response_all(Workers1, Responses1) end. @@ -164,6 +170,15 @@ handle_response_any(Shard, Response, Workers, Any, CleanupCb) -> end. +handle_response_all(Workers, Responses) -> + case fabric_dict:size(Workers) =:= 0 of + true -> + {stop, fabric_dict:from_list(Responses)}; + false -> + {ok, {Workers, Responses}} + end. + + % Check if workers still waiting and the already received responses could % still form a continous range. The range won't always be the full ring, and % the bounds are computed based on the minimum and maximum interval beginning @@ -186,6 +201,9 @@ is_progress_possible(Counters, Responses, MinB, MaxE, []) -> Ranges = fabric_util:worker_ranges(Counters) ++ ResponseRanges, mem3_util:get_ring(Ranges, MinB, MaxE) =/= []; +is_progress_possible(Counters, _Responses, _, _, [all]) -> + fabric_dict:size(Counters) > 0; + is_progress_possible(Counters, Responses, _, _, [{any, AnyShards}]) -> InAny = fun(S) -> lists:member(S#shard{ref = undefined}, AnyShards) end, case fabric_dict:filter(fun(S, _) -> InAny(S) end, Counters) of @@ -305,7 +323,7 @@ is_progress_possible_with_responses_test() -> ?assertEqual(true, is_progress_possible([], RS1, 7, 8, [])). -is_progress_possible_with_ring_opts_test() -> +is_progress_possible_with_ring_opts_any_test() -> Opts = [{any, [mk_shard("n1", [0, 5]), mk_shard("n2", [3, 10])]}], C1 = [{mk_shard("n1", [0, ?RING_END]), nil}], RS1 = mk_resps([{"n1", 3, 10, 42}]), @@ -323,6 +341,12 @@ is_progress_possible_with_ring_opts_test() -> ?assertEqual(true, is_progress_possible(C2, [], 0, ?RING_END, Opts)). +is_progress_possible_with_ring_opts_all_test() -> + C1 = [{mk_shard("n1", [0, ?RING_END]), nil}], + ?assertEqual(true, is_progress_possible(C1, [], 0, ?RING_END, [all])), + ?assertEqual(false, is_progress_possible([], [], 0, ?RING_END, [all])). + + get_shard_replacements_test() -> Unused = [mk_shard(N, [B, E]) || {N, B, E} <- [ {"n1", 11, 20}, {"n1", 21, ?RING_END}, @@ -422,7 +446,7 @@ handle_response_backtracking_test() -> ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4). -handle_response_ring_opts_test() -> +handle_response_ring_opts_any_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n2", [0, 1]), Shard3 = mk_shard("n3", [0, 1]), @@ -446,6 +470,30 @@ handle_response_ring_opts_test() -> ?assertEqual({stop, [{Shard3, 44}]}, Result3). +handle_response_ring_opts_all_test() -> + Shard1 = mk_shard("n1", [0, 5]), + Shard2 = mk_shard("n2", [0, 1]), + Shard3 = mk_shard("n3", [0, 1]), + + ShardList = [Shard1, Shard2, Shard3], + [W1, W2, W3] = WithRefs = [S#shard{ref = make_ref()} || S <- ShardList], + Workers1 = fabric_dict:init(WithRefs, nil), + + Result1 = handle_response(W1, 42, Workers1, [], [all], undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, _}} = Result1, + + % Even though n2 and n3 cover the same range, with 'all' option we wait for + % all workers to return. + Result2 = handle_response(W2, 43, Workers2, [], [all], undefined), + ?assertMatch({ok, {_, _}}, Result2), + {ok, {Workers3, _}} = Result2, + + % Stop only after all the shards respond + Result3 = handle_response(W3, 44, Workers3, [], [all], undefined), + ?assertMatch({stop, [_ | _]}, Result3). + + handle_error_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n1", [10, ?RING_END]), diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 9ed8efd14..3f8756afe 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -26,7 +26,7 @@ -export([get_db_info/2, get_doc_count/2, get_design_doc_count/2, get_update_seq/2, changes/4, map_view/5, reduce_view/5, - group_info/3, update_mrview/4]). + group_info/3, update_mrview/4, get_uuid/1]). -include_lib("fabric/include/fabric.hrl"). -include_lib("couch/include/couch_db.hrl"). @@ -318,6 +318,9 @@ compact(ShardName, DesignName) -> Ref = erlang:make_ref(), Pid ! {'$gen_call', {self(), Ref}, compact}. +get_uuid(DbName) -> + with_db(DbName, [], {couch_db, get_uuid, []}). + %% %% internal %% diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 84ffef122..e340ced55 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -23,6 +23,7 @@ -export([validate_all_docs_args/2, validate_args/3]). -export([upgrade_mrargs/1]). -export([worker_ranges/1]). +-export([isolate/1, isolate/2]). -compile({inline, [{doc_id_and_rev,1}]}). @@ -345,3 +346,51 @@ worker_ranges(Workers) -> [{X, Y} | Acc] end, [], Workers), lists:usort(Ranges). + + +% If we issue multiple fabric calls from the same process we have to isolate +% them so in case of error they don't pollute the processes dictionary or the +% mailbox + +isolate(Fun) -> + isolate(Fun, infinity). + + +isolate(Fun, Timeout) -> + {Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end), + receive + {'DOWN', Ref, _, _, {'$isolres', Res}} -> + Res; + {'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} -> + erlang:raise(Tag, Reason, Stack) + after Timeout -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + erlang:error(timeout) + end. + + +% OTP_RELEASE is defined in OTP 21+ only +-ifdef(OTP_RELEASE). + + +do_isolate(Fun) -> + try + {'$isolres', Fun()} + catch Tag:Reason:Stack -> + {'$isolerr', Tag, Reason, Stack} + end. + + +-else. + + +do_isolate(Fun) -> + try + {'$isolres', Fun()} + catch ?STACKTRACE(Tag, Reason, Stack) + {'$isolerr', Tag, Reason, Stack} + end. + + +-endif. diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index febbd3169..8d391bc68 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -453,12 +453,14 @@ do_unpack_seqs(Opaque, DbName) -> true -> Unpacked; false -> + Uuids = get_db_uuids(DbName), PotentialWorkers = lists:map(fun({Node, [A, B], Seq}) -> case mem3:get_shard(DbName, Node, [A, B]) of {ok, Shard} -> {Shard, Seq}; {error, not_found} -> - {#shard{node = Node, range = [A, B]}, Seq} + Shard = replace_moved_shard(Node, [A, B], Seq, Uuids), + {Shard, Seq} end end, Deduped), Shards = mem3:shards(DbName), @@ -495,6 +497,59 @@ get_old_seq(#shard{range=R}=Shard, SinceSeqs) -> end. +get_db_uuids(DbName) -> + % Need to use an isolated process as we are performing a fabric call from + % another fabric call and there is a good chance we'd polute the mailbox + % with returned messages + Timeout = fabric_util:request_timeout(), + IsolatedFun = fun() -> fabric:db_uuids(DbName) end, + try fabric_util:isolate(IsolatedFun, Timeout) of + {ok, Uuids} -> + % Trim uuids so we match exactly based on the currently configured + % uuid_prefix_len. The assumption is that we are getting an older + % sequence from the same cluster and we didn't tweak that + % relatively obscure config option in the meantime. + PrefixLen = config:get_integer("fabric", "uuid_prefix_len", 7), + maps:fold(fun(Uuid, Shard, Acc) -> + TrimmedUuid = binary:part(Uuid, {0, PrefixLen}), + Acc#{TrimmedUuid => Shard} + end, #{}, Uuids); + {error, Error} -> + % Since we are doing a best-effort approach to match moved shards, + % tollerate and log errors. This should also handle cases when the + % cluster is partially upgraded, as some nodes will not have the + % newer get_uuid fabric_rpc handler. + ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p", + couch_log:error(ErrMsg, [?MODULE, DbName, Error]), + #{} + catch + _Tag:Error -> + ErrMsg = "~p : could not get db_uuids for Db:~p Error:~p", + couch_log:error(ErrMsg, [?MODULE, DbName, Error]), + #{} + end. + + +%% Determine if the missing shard moved to a new node. Do that by matching the +%% uuids from the current shard map. If we cannot find a moved shard, we return +%% the original node and range as a shard and hope for the best. +replace_moved_shard(Node, Range, Seq, #{} = _Uuids) when is_number(Seq) -> + % Cannot figure out shard moves wouthout uuid matching + #shard{node = Node, range = Range}; +replace_moved_shard(Node, Range, {Seq, Uuid}, #{} = Uuids) -> + % Compatibility case for an old seq format which didn't have epoch nodes + replace_moved_shard(Node, Range, {Seq, Uuid, Node}, Uuids); +replace_moved_shard(Node, Range, {_Seq, Uuid, _EpochNode}, #{} = Uuids) -> + case Uuids of + #{Uuid := #shard{range = Range} = Shard} -> + % Found a moved shard by matchign both the uuid and the range + Shard; + #{} -> + % Did not find a moved shard, use the original node + #shard{node = Node, range = Range} + end. + + changes_row(Props0) -> Props1 = case couch_util:get_value(deleted, Props0) of true -> diff --git a/src/fabric/test/eunit/fabric_db_uuids_tests.erl b/src/fabric/test/eunit/fabric_db_uuids_tests.erl new file mode 100644 index 000000000..656ab3b87 --- /dev/null +++ b/src/fabric/test/eunit/fabric_db_uuids_tests.erl @@ -0,0 +1,57 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_uuids_tests). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +-define(TDEF(A), {atom_to_list(A), fun A/0}). + + +main_test_() -> + { + setup, + fun setup/0, + fun teardown/1, + [ + ?TDEF(t_can_get_shard_uuids) + ] + }. + + +setup() -> + test_util:start_couch([fabric]). + + +teardown(Ctx) -> + meck:unload(), + test_util:stop_couch(Ctx). + + +t_can_get_shard_uuids() -> + DbName = ?tempdb(), + ok = fabric:create_db(DbName, []), + Shards = mem3:shards(DbName), + {ok, Uuids} = fabric:db_uuids(DbName), + ?assertEqual(length(Shards), map_size(Uuids)), + UuidsFromShards = lists:foldl(fun(#shard{} = Shard, Acc) -> + Uuid = couch_util:with_db(Shard#shard.name, fun(Db) -> + couch_db:get_uuid(Db) + end), + Acc#{Uuid => Shard} + end, #{}, Shards), + ?assertEqual(UuidsFromShards, Uuids), + ok = fabric:delete_db(DbName, []). diff --git a/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl b/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl new file mode 100644 index 000000000..79ca37ba1 --- /dev/null +++ b/src/fabric/test/eunit/fabric_moved_shards_seq_tests.erl @@ -0,0 +1,123 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_moved_shards_seq_tests). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +-define(TDEF(A), {atom_to_list(A), fun A/0}). + + +main_test_() -> + { + setup, + fun setup/0, + fun teardown/1, + [ + ?TDEF(t_shard_moves_avoid_sequence_rewinds) + ] + }. + + +setup() -> + test_util:start_couch([fabric]). + + + +teardown(Ctx) -> + meck:unload(), + test_util:stop_couch(Ctx). + + +t_shard_moves_avoid_sequence_rewinds() -> + DocCnt = 30, + DbName = ?tempdb(), + + ok = fabric:create_db(DbName, [{q,1}, {n,1}]), + lists:foreach(fun(I) -> + update_doc(DbName, #doc{id = erlang:integer_to_binary(I)}) + end, lists:seq(1, DocCnt)), + + {ok, _, Seq1, 0} = changes(DbName, #changes_args{limit = 1, since ="now"}), + [{_, Range, {Seq, Uuid, _}}] = seq_decode(Seq1), + + % Transform Seq1 pretending it came from a fake source node, before the + % shard was moved to the current node. + SrcNode = 'srcnode@srchost', + Seq2 = seq_encode([{SrcNode, Range, {Seq, Uuid, SrcNode}}]), + + % First, check when the shard file epoch is mismatched epoch and the + % sequence would rewind. This ensures the epoch and uuid check protection + % in couch_db works as intended. + ResBadEpoch = changes(DbName, #changes_args{limit = 1, since = Seq2}), + ?assertMatch({ok, _, _, _}, ResBadEpoch), + {ok, _, _, PendingBadEpoch} = ResBadEpoch, + ?assertEqual(DocCnt - 1, PendingBadEpoch), + + % Mock epoch checking to pretend that shard actually used to live on + % SrcNode. In this case, we should not have rewinds. + mock_epochs([{node(), DocCnt}, {SrcNode, 1}]), + ResMockedEpoch = changes(DbName, #changes_args{limit = 1, since = Seq2}), + ?assertMatch({ok, _, _, _}, ResMockedEpoch), + {ok, _, _, PendingMockedEpoch} = ResMockedEpoch, + ?assertEqual(0, PendingMockedEpoch), + + ok = fabric:delete_db(DbName, []). + + +changes_callback(start, Acc) -> + {ok, Acc}; + +changes_callback({change, {Change}}, Acc) -> + CM = maps:from_list(Change), + {ok, [CM | Acc]}; + +changes_callback({stop, EndSeq, Pending}, Acc) -> + {ok, Acc, EndSeq, Pending}. + + +changes(DbName, #changes_args{} = Args) -> + fabric_util:isolate(fun() -> + fabric:changes(DbName, fun changes_callback/2, [], Args) + end). + + +update_doc(DbName, #doc{} = Doc) -> + fabric_util:isolate(fun() -> + case fabric:update_doc(DbName, Doc, [?ADMIN_CTX]) of + {ok, Res} -> Res + end + end). + + +seq_decode(Seq) -> + % This is copied from fabric_view_changes + Pattern = "^\"?([0-9]+-)?(?.*?)\"?$", + Options = [{capture, [opaque], binary}], + {match, Seq1} = re:run(Seq, Pattern, Options), + binary_to_term(couch_util:decodeBase64Url(Seq1)). + + +seq_encode(Unpacked) -> + % Copied from fabric_view_changes + Opaque = couch_util:encodeBase64Url(term_to_binary(Unpacked, [compressed])), + ?l2b(["30", $-, Opaque]). + + +mock_epochs(Epochs) -> + % Since we made up a node name we'll have to mock epoch checking + meck:new(couch_db_engine, [passthrough]), + meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end). -- cgit v1.2.1