diff options
Diffstat (limited to 'src/fabric/src/fabric_db_partition_info.erl')
-rw-r--r-- | src/fabric/src/fabric_db_partition_info.erl | 84 |
1 files changed, 70 insertions, 14 deletions
diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl index 2978832f0..954c52db2 100644 --- a/src/fabric/src/fabric_db_partition_info.erl +++ b/src/fabric/src/fabric_db_partition_info.erl @@ -17,15 +17,27 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). + +-record(acc, { + counters, + replies, + ring_opts +}). + + go(DbName, Partition) -> - Shards = mem3:shards(DbName, <<Partition/binary, ":foo">>), + Shards = mem3:shards(DbName, couch_partition:shard_key(Partition)), Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]), RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, - Acc0 = {fabric_dict:init(Workers, nil), []}, + Acc0 = #acc{ + counters = fabric_dict:init(Workers, nil), + replies = [], + ring_opts = [{any, Shards}] + }, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of - {ok, Acc} -> {ok, Acc}; + {ok, Res} -> {ok, Res}; {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers( WorkersDict, @@ -42,36 +54,39 @@ go(DbName, Partition) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> - case fabric_util:remove_down_workers(Counters, NodeRef) of +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, #acc{} = Acc) -> + #acc{counters = Counters, ring_opts = RingOpts} = Acc, + case fabric_util:remove_down_workers(Counters, NodeRef, RingOpts) of {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; + {ok, Acc#acc{counters = NewCounters}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> +handle_message({rexi_EXIT, Reason}, Shard, #acc{} = Acc) -> + #acc{counters = Counters, ring_opts = RingOpts} = Acc, NewCounters = fabric_dict:erase(Shard, Counters), - case fabric_ring:is_progress_possible(NewCounters) of + case fabric_ring:is_progress_possible(NewCounters, RingOpts) of true -> - {ok, {NewCounters, Acc}}; + {ok, Acc#acc{counters = NewCounters}}; false -> {error, Reason} end; -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> - Acc2 = [Info | Acc], +handle_message({ok, Info}, #shard{dbname=Name} = Shard, #acc{} = Acc) -> + #acc{counters = Counters, replies = Replies} = Acc, + Replies1 = [Info | Replies], Counters1 = fabric_dict:erase(Shard, Counters), case fabric_dict:size(Counters1) =:= 0 of true -> - [FirstInfo | RestInfos] = Acc2, + [FirstInfo | RestInfos] = Replies1, PartitionInfo = get_max_partition_size(FirstInfo, RestInfos), {stop, [{db_name, Name} | format_partition(PartitionInfo)]}; false -> - {ok, {Counters1, Acc2}} + {ok, Acc#acc{counters = Counters1, replies = Replies1}} end; -handle_message(_, _, Acc) -> +handle_message(_, _, #acc{} = Acc) -> {ok, Acc}. @@ -97,3 +112,44 @@ format_partition(PartitionInfo) -> {value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo), [{sizes, {Size}} | PartitionInfo1]. + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +node_down_test() -> + [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])], + Acc1 = #acc{ + counters = fabric_dict:init([S1, S2], nil), + ring_opts = [{any, [S1, S2]}] + }, + + N1 = S1#shard.node, + {ok, Acc2} = handle_message({rexi_DOWN, nil, {nil, N1}, nil}, nil, Acc1), + ?assertEqual([{S2, nil}], Acc2#acc.counters), + + N2 = S2#shard.node, + ?assertEqual({error, {nodedown, <<"progress not possible">>}}, + handle_message({rexi_DOWN, nil, {nil, N2}, nil}, nil, Acc2)). + + +worker_exit_test() -> + [S1, S2] = [mk_shard("n1", [0, 4]), mk_shard("n2", [0, 8])], + Acc1 = #acc{ + counters = fabric_dict:init([S1, S2], nil), + ring_opts = [{any, [S1, S2]}] + }, + + {ok, Acc2} = handle_message({rexi_EXIT, boom}, S1, Acc1), + ?assertEqual([{S2, nil}], Acc2#acc.counters), + + ?assertEqual({error, bam}, handle_message({rexi_EXIT, bam}, S2, Acc2)). + + +mk_shard(Name, Range) -> + Node = list_to_atom(Name), + BName = list_to_binary(Name), + #shard{name = BName, node = Node, range = Range}. + +-endif. |