summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-09-30 23:39:08 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-10-03 12:21:02 -0400
commit70478e6616e81bdae6da1a795c986e90d4fd5c6b (patch)
tree5d75ed144381aeb4c456f0c2f70630e64a01ab17
parentfab8c4f15e0f7b4c3a621671afeb01034a551df6 (diff)
downloadcouchdb-70478e6616e81bdae6da1a795c986e90d4fd5c6b.tar.gz
Introduce {rmin, R} option for fabric:handle_response/4,5
This option is useful when we want to accumulate at least R result copies for each document. Typically R=2 in a default 3 node cluster. Without shard splitting, this would simply be waiting for at least R copies returned from each range. However, in the case when shard copies don't have exactly the same range boundaries, the correct logic is to try to completely cover the ring at least R times. The algorith for checking if the ring can be covered is already implemented, we just add it as an option to fabric:handle_response/4,5. This is essentially another preparatory PR to implement the optimized `fabric:bulk_get(...)` API as described in issue #4183 and it's just split out as a seprate commit as it's a fairly standalone change. Additionally, had noticed that the unit tests in fabric_ring didn't cover the cleanup callback function, so added tests to make sure we exercise that code path. Emacs + erlang_ls also noticed the unused header included, so that's why that was removed.
-rw-r--r--src/fabric/src/fabric_ring.erl132
-rw-r--r--src/mem3/src/mem3_util.erl32
2 files changed, 157 insertions, 7 deletions
diff --git a/src/fabric/src/fabric_ring.erl b/src/fabric/src/fabric_ring.erl
index 9349efb90..3973892d4 100644
--- a/src/fabric/src/fabric_ring.erl
+++ b/src/fabric/src/fabric_ring.erl
@@ -24,7 +24,6 @@
handle_response/5
]).
--include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-type fabric_dict() :: [{#shard{}, any()}].
@@ -118,6 +117,14 @@ handle_response(Shard, Response, Workers, Responses, RingOpts) ->
% ring, where all copies at the start of the range and end of the range must
% have the same boundary values.
%
+% * When RingOpts is [{rmin, R}], where R is an integer, accumulate responses
+% until the ring can be completed at least R times. If shards have not been
+% split, this would be the same as waiting until there are at least R
+% responses for each shard range. Also of note is that [] is not exactly
+% equivalent to [{rmin, 1}], as with [{rmin, 1}] it's possible that some
+% shard ranges would return more than R copies while waiting for other
+% ranges to respond.
+%
% * When RingOpts is [{any, [#shard{}]}] responses are accepted from any of
% the provided list of shards. This type of ring might be used when querying
% a partitioned database. As soon as a result from any of the shards
@@ -133,6 +140,10 @@ handle_response(Shard, Response, Workers, Responses, RingOpts, CleanupCb) ->
#shard{range = [B, E]} = Shard,
Responses1 = [{{B, E}, Shard, Response} | Responses],
handle_response_ring(Workers1, Responses1, CleanupCb);
+ [{rmin, RMin}] when is_integer(RMin), RMin >= 1 ->
+ #shard{range = [B, E]} = Shard,
+ Responses1 = [{{B, E}, Shard, Response} | Responses],
+ handle_response_rmin(RMin, Workers1, Responses1, CleanupCb);
[{any, Any}] ->
handle_response_any(Shard, Response, Workers1, Any, CleanupCb);
[all] ->
@@ -159,6 +170,18 @@ handle_response_ring(Workers, Responses, CleanupCb) ->
{stop, fabric_dict:from_list(UsedResponses)}
end.
+handle_response_rmin(RMin, Workers, Responses, CleanupCb) ->
+ {MinB, MaxE} = range_bounds(Workers, Responses),
+ Shards = lists:map(fun({_, S, _}) -> S end, Responses),
+ case mem3_util:calculate_max_n(Shards, MinB, MaxE) of
+ MaxN when is_integer(MaxN), MaxN < RMin ->
+ {ok, {Workers, Responses}};
+ MaxN when is_integer(MaxN), MaxN >= RMin ->
+ UsedResponses = lists:map(fun({_, S, R}) -> {S, R} end, Responses),
+ stop_unused_workers(Workers, Responses, UsedResponses, CleanupCb),
+ {stop, fabric_dict:from_list(UsedResponses)}
+ end.
+
handle_response_any(Shard, Response, Workers, Any, CleanupCb) ->
case lists:member(Shard#shard{ref = undefined}, Any) of
true ->
@@ -448,6 +471,50 @@ handle_response_backtracking_test() ->
Result4 = handle_response(Shard4, 45, Workers4, Responses3, [], undefined),
?assertEqual({stop, [{Shard3, 44}, {Shard4, 45}]}, Result4).
+handle_response_rmin_test() ->
+ Shard1 = mk_shard("n1", [0, 5]),
+ Shard2 = mk_shard("n1", [6, 9]),
+ Shard3 = mk_shard("n1", [10, ?RING_END]),
+ Shard4 = mk_shard("n2", [2, ?RING_END]),
+ Shard5 = mk_shard("n3", [0, 1]),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3, Shard4, Shard5], nil),
+
+ Opts = [{rmin, 2}],
+
+ Result1 = handle_response(Shard1, 101, Workers1, [], Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result1),
+ {ok, {Workers2, Responses1}} = Result1,
+
+ Result2 = handle_response(Shard3, 103, Workers2, Responses1, Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result2),
+ {ok, {Workers3, Responses2}} = Result2,
+
+ Result3 = handle_response(Shard4, 104, Workers3, Responses2, Opts, undefined),
+ ?assertMatch({ok, {_, _}}, Result3),
+ {ok, {Workers4, Responses3}} = Result3,
+
+ Result4 = handle_response(Shard5, 105, Workers4, Responses3, Opts, undefined),
+ % Even though Shard4 and Shard5 would complete a full ring we're not done
+ % we need two full rings since our rmin is 2.
+ ?assertMatch({ok, {_, _}}, Result4),
+ {ok, {Workers5, Responses4}} = Result4,
+
+ Result5 = handle_response(Shard2, 102, Workers5, Responses4, Opts, undefined),
+ ?assertMatch({stop, [_ | _]}, Result5),
+
+ {stop, FinalResponses} = Result5,
+ ?assertEqual(
+ [
+ {Shard1, 101},
+ {Shard2, 102},
+ {Shard3, 103},
+ {Shard4, 104},
+ {Shard5, 105}
+ ],
+ lists:sort(FinalResponses)
+ ).
+
handle_response_ring_opts_any_test() ->
Shard1 = mk_shard("n1", [0, 5]),
Shard2 = mk_shard("n2", [0, 1]),
@@ -546,6 +613,64 @@ node_down_test() ->
?assertEqual(error, node_down(n3, Workers5, Responses3)).
+% Check that cleanup callback for fabric:handle_response/* gets called both to
+% kill workers which haven't returned or results which were not used (for
+% example if we wanted to stream for only the results we picked and throw the
+% other results away).
+%
+handle_response_cleanup_callback_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_cleanup_unused),
+ ?TDEF_FE(t_cleanup_not_returned)
+ ]
+ }.
+
+setup() ->
+ meck:new(rexi, [passthrough]),
+ meck:expect(rexi, kill_all, 1, ok),
+ ok.
+
+teardown(_) ->
+ meck:unload().
+
+t_cleanup_unused(_) ->
+ Ref1 = make_ref(),
+ Ref2 = make_ref(),
+ Ref3 = make_ref(),
+ Shard1 = mk_shard("n1", [0, 1], Ref1),
+ Shard2 = mk_shard("n2", [0, 1], Ref2),
+ Shard3 = mk_shard("n1", [2, ?RING_END], Ref3),
+
+ Workers1 = fabric_dict:init([Shard1, Shard2, Shard3], nil),
+ {ok, {Workers2, Responses1}} = handle_response(Shard1, 42, Workers1, []),
+ {ok, {Workers3, Responses2}} = handle_response(Shard2, 43, Workers2, Responses1),
+ {stop, [{_, 42}, {_, 44}]} = handle_response(Shard3, 44, Workers3, Responses2),
+ ?assertMatch(
+ [
+ {_, {rexi, kill_all, [[{n2, Ref2}]]}, ok}
+ ],
+ meck:history(rexi)
+ ).
+
+t_cleanup_not_returned(_) ->
+ Ref1 = make_ref(),
+ Ref2 = make_ref(),
+ Shard1 = mk_shard("n1", [0, ?RING_END], Ref1),
+ Shard2 = mk_shard("n2", [0, ?RING_END], Ref2),
+
+ Workers = fabric_dict:init([Shard1, Shard2], nil),
+ {stop, [{_, 42}]} = handle_response(Shard1, 42, Workers, []),
+ ?assertMatch(
+ [
+ {_, {rexi, kill_all, [[{n2, Ref2}]]}, ok}
+ ],
+ meck:history(rexi)
+ ).
+
mk_cnts(Ranges) ->
Shards = lists:map(fun mk_shard/1, Ranges),
fabric_dict:init([S#shard{ref = make_ref()} || S <- Shards], nil).
@@ -557,8 +682,11 @@ mk_shard([B, E]) when is_integer(B), is_integer(E) ->
#shard{range = [B, E]}.
mk_shard(Name, Range) ->
+ mk_shard(Name, Range, undefined).
+
+mk_shard(Name, Range, Ref) ->
Node = list_to_atom(Name),
BName = list_to_binary(Name),
- #shard{name = BName, node = Node, range = Range}.
+ #shard{name = BName, node = Node, range = Range, ref = Ref}.
-endif.
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 6a4ae1327..f05fe7378 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -43,7 +43,8 @@
get_ring/4,
non_overlapping_shards/1,
non_overlapping_shards/3,
- calculate_max_n/1
+ calculate_max_n/1,
+ calculate_max_n/3
]).
%% do not use outside mem3.
@@ -502,6 +503,9 @@ non_overlapping_shards(Shards, Start, End) ->
% across all the ranges. If the ring is incomplete it will return 0.
% If there it is an n = 1 database, it should return 1, etc.
calculate_max_n(Shards) ->
+ calculate_max_n(Shards, 0, ?RING_END).
+
+calculate_max_n(Shards, Start, End) when is_integer(Start), is_integer(End) ->
Ranges = lists:map(
fun(Shard) ->
[B, E] = mem3:range(Shard),
@@ -509,13 +513,15 @@ calculate_max_n(Shards) ->
end,
Shards
),
- calculate_max_n(Ranges, get_ring(Ranges), 0).
+ FirstRing = get_ring(Ranges, Start, End),
+ calculate_max_n(Ranges, FirstRing, Start, End, 0).
-calculate_max_n(_Ranges, [], N) ->
+calculate_max_n(_Ranges, [], _Start, _End, N) ->
N;
-calculate_max_n(Ranges, Ring, N) ->
+calculate_max_n(Ranges, Ring, Start, End, N) ->
NewRanges = Ranges -- Ring,
- calculate_max_n(NewRanges, get_ring(NewRanges), N + 1).
+ NewRing = get_ring(NewRanges, Start, End),
+ calculate_max_n(NewRanges, NewRing, Start, End, N + 1).
get_ring(Ranges) ->
get_ring(Ranges, fun sort_ranges_fun/2, 0, ?RING_END).
@@ -752,6 +758,22 @@ calculate_max_n_test_() ->
]
].
+calculate_max_n_custom_range_test_() ->
+ [
+ ?_assertEqual(Res, calculate_max_n(Shards, B, E))
+ || {Res, Shards, B, E} <- [
+ {0, [], 1, 15},
+ {0, [], 0, 15},
+ {0, [shard(1, 10)], 1, 15},
+ {0, [shard(0, 8)], 1, 15},
+ {1, [shard(0, 15)], 0, 15},
+ {1, [shard(0, 15), shard(1, 15)], 0, 15},
+ {2, [shard(0, 15), shard(0, 15)], 0, 15},
+ {2, [shard(0, 1), shard(2, 15), shard(0, 15)], 0, 15},
+ {0, [shard(0, 3), shard(3, 15), shard(1, 15)], 0, 15}
+ ]
+ ].
+
shard(Begin, End) ->
#shard{range = [Begin, End]}.