diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-09-30 23:39:08 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-10-03 12:21:02 -0400 |
commit | 70478e6616e81bdae6da1a795c986e90d4fd5c6b (patch) | |
tree | 5d75ed144381aeb4c456f0c2f70630e64a01ab17 | |
parent | fab8c4f15e0f7b4c3a621671afeb01034a551df6 (diff) | |
download | couchdb-70478e6616e81bdae6da1a795c986e90d4fd5c6b.tar.gz |
Introduce {rmin, R} option for fabric:handle_response/4,5
This option is useful when we want to accumulate at least R result copies for
each document. Typically R=2 in a default 3 node cluster. Without shard
splitting, this would simply be waiting for at least R copies returned from
each range. However, in the case when shard copies don't have exactly the same
range boundaries, the correct logic is to try to completely cover the ring at
least R times. The algorith for checking if the ring can be covered is already
implemented, we just add it as an option to fabric:handle_response/4,5.
This is essentially another preparatory PR to implement the optimized
`fabric:bulk_get(...)` API as described in issue #4183 and it's just split out
as a seprate commit as it's a fairly standalone change.
Additionally, had noticed that the unit tests in fabric_ring didn't cover the
cleanup callback function, so added tests to make sure we exercise that code
path.
Emacs + erlang_ls also noticed the unused header included, so that's why that
was removed.
-rw-r--r-- | src/fabric/src/fabric_ring.erl | 132 | ||||
-rw-r--r-- | src/mem3/src/mem3_util.erl | 32 |
2 files changed, 157 insertions, 7 deletions
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl index 9349efb90..3973892d4 100644 --- a/src/fabric/src/fabric_ring.erl +++ b/src/fabric/src/fabric_ring.erl @@ -24,7 +24,6 @@ handle_response/5 ]). --include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -type fabric_dict() :: [{#shard{}, any()}]. @@ -118,6 +117,14 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) -> % ring, where all copies at the start of the range and end of the range must % have the same boundary values. % +% * When RingOpts is [{rmin, R}], where R is an integer, accumulate responses +% until the ring can be completed at least R times. If shards have not been +% split, this would be the same as waiting until there are at least R +% responses for each shard range. Also of note is that [] is not exactly +% equivalent to [{rmin, 1}], as with [{rmin, 1}] it's possible that some +% shard ranges would return more than R copies while waiting for other +% ranges to respond. +% % * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of % the provided list of shards. This type of ring might be used when querying % a partitioned database. As soon as a result from any of the shards @@ -133,6 +140,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) -> #shard{range = [B, E]} = Shard, Responses1 = [{{B, E}, Shard, Response} | Responses], handle_response_ring(Workers1, Responses1, CleanupCb); + [{rmin, RMin}] when is_integer(RMin), RMin >= 1 -> + #shard{range = [B, E]} = Shard, + Responses1 = [{{B, E}, Shard, Response} | Responses], + handle_response_rmin(RMin, Workers1, Responses1, CleanupCb); [{any, Any}] -> handle_response_any(Shard, Response, Workers1, Any, CleanupCb); [all] -> @@ -159,6 +170,18 @@ handle_response_ring(Workers, Responses, CleanupCb) -> {stop, fabric_dict:from_list(UsedResponses)} end. +handle_response_rmin(RMin, Workers, Responses, CleanupCb) -> + {MinB, MaxE} = range_bounds(Workers, Responses), + Shards = lists:map(fun({_, S, _}) -> S end, Responses), + case mem3_util:calculate_max_n(Shards, MinB, MaxE) of + MaxN when is_integer(MaxN), MaxN < RMin -> + {ok, {Workers, Responses}}; + MaxN when is_integer(MaxN), MaxN >= RMin -> + UsedResponses = lists:map(fun({_, S, R}) -> {S, R} end, Responses), + stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb), + {stop, fabric_dict:from_list(UsedResponses)} + end. + handle_response_any(Shard, Response, Workers, Any, CleanupCb) -> case lists:member(Shard#shard{ref = undefined}, Any) of true -> @@ -448,6 +471,50 @@ handle_response_backtracking_test() -> Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined), ?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4). +handle_response_rmin_test() -> + Shard1 = mk_shard("n1", [0, 5]), + Shard2 = mk_shard("n1", [6, 9]), + Shard3 = mk_shard("n1", [10, ?RING_END]), + Shard4 = mk_shard("n2", [2, ?RING_END]), + Shard5 = mk_shard("n3", [0, 1]), + + Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4, Shard5], nil), + + Opts = [{rmin, 2}], + + Result1 = handle_response(Shard1, 101, Workers1, [], Opts, undefined), + ?assertMatch({ok, {_, _}}, Result1), + {ok, {Workers2, Responses1}} = Result1, + + Result2 = handle_response(Shard3, 103, Workers2, Responses1, Opts, undefined), + ?assertMatch({ok, {_, _}}, Result2), + {ok, {Workers3, Responses2}} = Result2, + + Result3 = handle_response(Shard4, 104, Workers3, Responses2, Opts, undefined), + ?assertMatch({ok, {_, _}}, Result3), + {ok, {Workers4, Responses3}} = Result3, + + Result4 = handle_response(Shard5, 105, Workers4, Responses3, Opts, undefined), + % Even though Shard4 and Shard5 would complete a full ring we're not done + % we need two full rings since our rmin is 2. + ?assertMatch({ok, {_, _}}, Result4), + {ok, {Workers5, Responses4}} = Result4, + + Result5 = handle_response(Shard2, 102, Workers5, Responses4, Opts, undefined), + ?assertMatch({stop, [_ | _]}, Result5), + + {stop, FinalResponses} = Result5, + ?assertEqual( + [ + {Shard1, 101}, + {Shard2, 102}, + {Shard3, 103}, + {Shard4, 104}, + {Shard5, 105} + ], + lists:sort(FinalResponses) + ). + handle_response_ring_opts_any_test() -> Shard1 = mk_shard("n1", [0, 5]), Shard2 = mk_shard("n2", [0, 1]), @@ -546,6 +613,64 @@ node_down_test() -> ?assertEqual(error, node_down(n3, Workers5, Responses3)). +% Check that cleanup callback for fabric:handle_response/* gets called both to +% kill workers which haven't returned or results which were not used (for +% example if we wanted to stream for only the results we picked and throw the +% other results away). +% +handle_response_cleanup_callback_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_cleanup_unused), + ?TDEF_FE(t_cleanup_not_returned) + ] + }. + +setup() -> + meck:new(rexi, [passthrough]), + meck:expect(rexi, kill_all, 1, ok), + ok. + +teardown(_) -> + meck:unload(). + +t_cleanup_unused(_) -> + Ref1 = make_ref(), + Ref2 = make_ref(), + Ref3 = make_ref(), + Shard1 = mk_shard("n1", [0, 1], Ref1), + Shard2 = mk_shard("n2", [0, 1], Ref2), + Shard3 = mk_shard("n1", [2, ?RING_END], Ref3), + + Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil), + {ok, {Workers2, Responses1}} = handle_response(Shard1, 42, Workers1, []), + {ok, {Workers3, Responses2}} = handle_response(Shard2, 43, Workers2, Responses1), + {stop, [{_, 42}, {_, 44}]} = handle_response(Shard3, 44, Workers3, Responses2), + ?assertMatch( + [ + {_, {rexi, kill_all, [[{n2, Ref2}]]}, ok} + ], + meck:history(rexi) + ). + +t_cleanup_not_returned(_) -> + Ref1 = make_ref(), + Ref2 = make_ref(), + Shard1 = mk_shard("n1", [0, ?RING_END], Ref1), + Shard2 = mk_shard("n2", [0, ?RING_END], Ref2), + + Workers = fabric_dict:init([Shard1, Shard2], nil), + {stop, [{_, 42}]} = handle_response(Shard1, 42, Workers, []), + ?assertMatch( + [ + {_, {rexi, kill_all, [[{n2, Ref2}]]}, ok} + ], + meck:history(rexi) + ). + mk_cnts(Ranges) -> Shards = lists:map(fun mk_shard/1, Ranges), fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil). @@ -557,8 +682,11 @@ mk_shard([B, E]) when is_integer(B), is_integer(E) -> #shard{range = [B, E]}. mk_shard(Name, Range) -> + mk_shard(Name, Range, undefined). + +mk_shard(Name, Range, Ref) -> Node = list_to_atom(Name), BName = list_to_binary(Name), - #shard{name = BName, node = Node, range = Range}. + #shard{name = BName, node = Node, range = Range, ref = Ref}. -endif. diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index 6a4ae1327..f05fe7378 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -43,7 +43,8 @@ get_ring/4, non_overlapping_shards/1, non_overlapping_shards/3, - calculate_max_n/1 + calculate_max_n/1, + calculate_max_n/3 ]). %% do not use outside mem3. @@ -502,6 +503,9 @@ non_overlapping_shards(Shards, Start, End) -> % across all the ranges. If the ring is incomplete it will return 0. % If there it is an n = 1 database, it should return 1, etc. calculate_max_n(Shards) -> + calculate_max_n(Shards, 0, ?RING_END). + +calculate_max_n(Shards, Start, End) when is_integer(Start), is_integer(End) -> Ranges = lists:map( fun(Shard) -> [B, E] = mem3:range(Shard), @@ -509,13 +513,15 @@ calculate_max_n(Shards) -> end, Shards ), - calculate_max_n(Ranges, get_ring(Ranges), 0). + FirstRing = get_ring(Ranges, Start, End), + calculate_max_n(Ranges, FirstRing, Start, End, 0). -calculate_max_n(_Ranges, [], N) -> +calculate_max_n(_Ranges, [], _Start, _End, N) -> N; -calculate_max_n(Ranges, Ring, N) -> +calculate_max_n(Ranges, Ring, Start, End, N) -> NewRanges = Ranges -- Ring, - calculate_max_n(NewRanges, get_ring(NewRanges), N + 1). + NewRing = get_ring(NewRanges, Start, End), + calculate_max_n(NewRanges, NewRing, Start, End, N + 1). get_ring(Ranges) -> get_ring(Ranges, fun sort_ranges_fun/2, 0, ?RING_END). @@ -752,6 +758,22 @@ calculate_max_n_test_() -> ] ]. +calculate_max_n_custom_range_test_() -> + [ + ?_assertEqual(Res, calculate_max_n(Shards, B, E)) + || {Res, Shards, B, E} <- [ + {0, [], 1, 15}, + {0, [], 0, 15}, + {0, [shard(1, 10)], 1, 15}, + {0, [shard(0, 8)], 1, 15}, + {1, [shard(0, 15)], 0, 15}, + {1, [shard(0, 15), shard(1, 15)], 0, 15}, + {2, [shard(0, 15), shard(0, 15)], 0, 15}, + {2, [shard(0, 1), shard(2, 15), shard(0, 15)], 0, 15}, + {0, [shard(0, 3), shard(3, 15), shard(1, 15)], 0, 15} + ] + ]. + shard(Begin, End) -> #shard{range = [Begin, End]}. |