summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_db_partition_info.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_db_partition_info.erl')
-rw-r--r--src/fabric/src/fabric_db_partition_info.erl84
1 files changed, 70 insertions, 14 deletions
diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl
index 2978832f0..954c52db2 100644
--- a/src/fabric/src/fabric_db_partition_info.erl
+++ b/src/fabric/src/fabric_db_partition_info.erl
@@ -17,15 +17,27 @@
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
+
+-record(acc, {
+ counters,
+ replies,
+ ring_opts
+}).
+
+
go(DbName, Partition) ->
- Shards = mem3:shards(DbName, <<Partition/binary, ":foo">>),
+ Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)),
Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]),
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
- Acc0 = {fabric_dict:init(Workers, nil), []},
+ Acc0 = #acc{
+ counters = fabric_dict:init(Workers, nil),
+ replies = [],
+ ring_opts = [{any, Shards}]
+ },
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
- {ok, Acc} -> {ok, Acc};
+ {ok, Res} -> {ok, Res};
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(
WorkersDict,
@@ -42,36 +54,39 @@ go(DbName, Partition) ->
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
- case fabric_util:remove_down_workers(Counters, NodeRef) of
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, #acc{} = Acc) ->
+ #acc{counters = Counters, ring_opts = RingOpts} = Acc,
+ case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of
{ok, NewCounters} ->
- {ok, {NewCounters, Acc}};
+ {ok, Acc#acc{counters = NewCounters}};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, #acc{} = Acc) ->
+ #acc{counters = Counters, ring_opts = RingOpts} = Acc,
NewCounters = fabric_dict:erase(Shard, Counters),
- case fabric_ring:is_progress_possible(NewCounters) of
+ case fabric_ring:is_progress_possible(NewCounters, RingOpts) of
true ->
- {ok, {NewCounters, Acc}};
+ {ok, Acc#acc{counters = NewCounters}};
false ->
{error, Reason}
end;
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
- Acc2 = [Info | Acc],
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, #acc{} = Acc) ->
+ #acc{counters = Counters, replies = Replies} = Acc,
+ Replies1 = [Info | Replies],
Counters1 = fabric_dict:erase(Shard, Counters),
case fabric_dict:size(Counters1) =:= 0 of
true ->
- [FirstInfo | RestInfos] = Acc2,
+ [FirstInfo | RestInfos] = Replies1,
PartitionInfo = get_max_partition_size(FirstInfo, RestInfos),
{stop, [{db_name, Name} | format_partition(PartitionInfo)]};
false ->
- {ok, {Counters1, Acc2}}
+ {ok, Acc#acc{counters = Counters1, replies = Replies1}}
end;
-handle_message(_, _, Acc) ->
+handle_message(_, _, #acc{} = Acc) ->
{ok, Acc}.
@@ -97,3 +112,44 @@ format_partition(PartitionInfo) ->
{value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo),
[{sizes, {Size}} | PartitionInfo1].
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+node_down_test() ->
+ [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])],
+ Acc1 = #acc{
+ counters = fabric_dict:init([S1, S2], nil),
+ ring_opts = [{any, [S1, S2]}]
+ },
+
+ N1 = S1#shard.node,
+ {ok, Acc2} = handle_message({rexi_DOWN, nil, {nil, N1}, nil}, nil, Acc1),
+ ?assertEqual([{S2, nil}], Acc2#acc.counters),
+
+ N2 = S2#shard.node,
+ ?assertEqual({error, {nodedown, <<"progress not possible">>}},
+ handle_message({rexi_DOWN, nil, {nil, N2}, nil}, nil, Acc2)).
+
+
+worker_exit_test() ->
+ [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])],
+ Acc1 = #acc{
+ counters = fabric_dict:init([S1, S2], nil),
+ ring_opts = [{any, [S1, S2]}]
+ },
+
+ {ok, Acc2} = handle_message({rexi_EXIT, boom}, S1, Acc1),
+ ?assertEqual([{S2, nil}], Acc2#acc.counters),
+
+ ?assertEqual({error, bam}, handle_message({rexi_EXIT, bam}, S2, Acc2)).
+
+
+mk_shard(Name, Range) ->
+ Node = list_to_atom(Name),
+ BName = list_to_binary(Name),
+ #shard{name = BName, node = Node, range = Range}.
+
+-endif.