diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-04-30 13:05:33 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-04-30 13:23:05 -0500 |
commit | 97d836e57f6161c0a99a7e910ec00d78ee0823ed (patch) | |
tree | 4ae117ea3c7cf96c357380b60e7cefbc299254c7 | |
parent | a252e384e33401c7fc50d1d9798756b8c7fdd4f6 (diff) | |
download | couchdb-97d836e57f6161c0a99a7e910ec00d78ee0823ed.tar.gz |
[SQUERGE] Fix tests and code for fabric_doc_purge
-rw-r--r-- | src/fabric/src/fabric_doc_purge.erl | 629 |
1 files changed, 394 insertions, 235 deletions
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl index b51f28eec..61b84d81b 100644 --- a/src/fabric/src/fabric_doc_purge.erl +++ b/src/fabric/src/fabric_doc_purge.erl @@ -24,8 +24,8 @@ -record(acc, { worker_uuids, - req_count, resps, + uuid_counts, w }). @@ -34,7 +34,7 @@ go(_, [], _) -> {ok, []}; go(DbName, IdsRevs, Options) -> % Generate our purge requests of {UUID, DocId, Revs} - {UUIDs, Reqs, Count} = create_reqs(IdsRevs, [], [], 0), + {UUIDs, Reqs} = create_reqs(IdsRevs, [], []), % Fire off rexi workers for each shard. {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) -> @@ -46,12 +46,18 @@ go(DbName, IdsRevs, Options) -> {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]} end, {[], []}, group_reqs_by_shard(DbName, Reqs)), + UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) -> + lists:foldl(fun(UUID, InnerCountAcc) -> + dict:update_counter(UUID, 1, InnerCountAcc) + end, CountAcc, WUUIDs) + end, dict:new(), WorkerUUIDs), + RexiMon = fabric_util:create_monitors(Workers), Timeout = fabric_util:request_timeout(), Acc0 = #acc{ worker_uuids = WorkerUUIDs, - req_count = Count, resps = dict:from_list([{UUID, []} || UUID <- UUIDs]), + uuid_counts = UUIDCounts, w = w(DbName, Options) }, Acc2 = try rexi_utils:recv(Workers, #shard.ref, @@ -73,7 +79,8 @@ go(DbName, IdsRevs, Options) -> rexi_monitor:stop(RexiMon) end, - {ok, format_resps(UUIDs, Acc2)}. + FinalResps = format_resps(UUIDs, Acc2), + {resp_health(FinalResps), FinalResps}. handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> @@ -101,7 +108,6 @@ handle_message({ok, Replies}, Worker, Acc) -> resps = Resps } = Acc, {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), - couch_log:error("XKCD: ~p ~p :: ~p ~p", [length(UUIDs), length(Replies), UUIDs, Replies]), NewResps = append_resps(UUIDs, Replies, Resps), maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); @@ -109,14 +115,14 @@ handle_message({bad_request, Msg}, _, _) -> throw({bad_request, Msg}). -create_reqs([], UUIDs, Reqs, Count) -> - {lists:reverse(UUIDs), lists:reverse(Reqs), Count}; +create_reqs([], UUIDs, Reqs) -> + {lists:reverse(UUIDs), lists:reverse(Reqs)}; -create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs, Count) -> +create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) -> UUID = couch_uuids:new(), NewUUIDs = [UUID | UUIDs], NewReqs = [{UUID, Id, Revs} | Reqs], - create_reqs(RestIdsRevs, NewUUIDs, NewReqs, Count + 1). + create_reqs(RestIdsRevs, NewUUIDs, NewReqs). group_reqs_by_shard(DbName, Reqs) -> @@ -151,11 +157,11 @@ append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) -> maybe_stop(#acc{worker_uuids = []} = Acc) -> {stop, Acc}; -maybe_stop(#acc{resps = Resps, w = W} = Acc) -> +maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) -> try - dict:fold(fun(_UUID, UUIDResps, _) -> - couch_log:error("XKCD: ~p ~p", [UUIDResps, W]), - case has_quorum(UUIDResps, W) of + dict:fold(fun(UUID, UUIDResps, _) -> + UUIDCount = dict:fetch(UUID, Counts), + case has_quorum(UUIDResps, UUIDCount, W) of true -> ok; false -> throw(keep_going) end @@ -179,7 +185,9 @@ format_resps(UUIDs, #acc{} = Acc) -> [{UUID, Error} | ReplyAcc]; _ -> AllRevs = lists:usort(lists:flatten(OkReplies)), - Health = if length(OkReplies) >= W -> ok; true -> accepted end, + IsOk = length(OkReplies) >= W + andalso length(lists:usort(OkReplies)) == 1, + Health = if IsOk -> ok; true -> accepted end, [{UUID, {Health, AllRevs}} | ReplyAcc] end end, @@ -190,224 +198,375 @@ format_resps(_UUIDs, Else) -> Else. -has_quorum([], W) when W > 0 -> - false; -has_quorum(_, W) when W =< 0 -> - true; -has_quorum([{ok, _} | Rest], W) when W > 0 -> - has_quorum(Rest, W - 1). - - -%% % eunits -%% doc_purge_ok_test() -> -%% meck:new(couch_log), -%% meck:expect(couch_log, warning, fun(_,_) -> ok end), -%% meck:expect(couch_log, notice, fun(_,_) -> ok end), -%% -%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>, -%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1}, -%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>, -%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2}, -%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2], -%% Shards = -%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), -%% Counters = dict:to_list( -%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)), -%% DocsDict = dict:new(), -%% -%% % ***test for W = 2 -%% AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"), -%% Counters, DocsDict}, -%% {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} = -%% handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2), -%% ?assertEqual(2, WaitingCountW2_1), -%% {stop, FinalReplyW2 } = -%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, -%% lists:nth(2,Shards), AccW2_1), -%% ?assertEqual( -%% {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]}, -%% FinalReplyW2 -%% ), -%% -%% % ***test for W = 3 -%% AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"), -%% Counters, DocsDict}, -%% {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} = -%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3), -%% ?assertEqual(2, WaitingCountW3_1), -%% {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} = -%% handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, -%% lists:nth(2,Shards), AccW3_1), -%% ?assertEqual(1, WaitingCountW3_2), -%% {stop, FinalReplyW3 } = -%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, -%% lists:nth(3,Shards), AccW3_2), -%% ?assertEqual( -%% {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]}, -%% FinalReplyW3 -%% ), -%% -%% % *** test rexi_exit on 1 node -%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"), -%% Counters, DocsDict}, -%% {ok, {WaitingCount1,_,_,_,_} = Acc1} = -%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0), -%% ?assertEqual(2, WaitingCount1), -%% {ok, {WaitingCount2,_,_,_,_} = Acc2} = -%% handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1), -%% ?assertEqual(1, WaitingCount2), -%% {stop, Reply} = -%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, -%% lists:nth(3,Shards), Acc2), -%% ?assertEqual( -%% {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]}, -%% Reply -%% ), -%% -%% % *** test {error, purge_during_compaction_exceeded_limit} on all nodes -%% % *** still should return ok reply for the request -%% ErrPDCEL = {error, purge_during_compaction_exceeded_limit}, -%% Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"), -%% Counters, DocsDict}, -%% {ok, {WaitingCount21,_,_,_,_} = Acc21} = -%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20), -%% ?assertEqual(2, WaitingCount21), -%% {ok, {WaitingCount22,_,_,_,_} = Acc22} = -%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21), -%% ?assertEqual(1, WaitingCount22), -%% {stop, Reply2 } = -%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22), -%% ?assertEqual( -%% {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]}, -%% Reply2 -%% ), -%% -%% % *** test {error, purged_docs_limit_exceeded} on all nodes -%% % *** still should return ok reply for the request -%% ErrPDLE = {error, purged_docs_limit_exceeded}, -%% Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"), -%% Counters, DocsDict}, -%% {ok, {WaitingCount31,_,_,_,_} = Acc31} = -%% handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30), -%% ?assertEqual(2, WaitingCount31), -%% {ok, {WaitingCount32,_,_,_,_} = Acc32} = -%% handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31), -%% ?assertEqual(1, WaitingCount32), -%% {stop, Reply3 } = -%% handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32), -%% ?assertEqual( -%% {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]}, -%% Reply3 -%% ), -%% meck:unload(couch_log). -%% -%% -%% doc_purge_accepted_test() -> -%% meck:new(couch_log), -%% meck:expect(couch_log, warning, fun(_,_) -> ok end), -%% meck:expect(couch_log, notice, fun(_,_) -> ok end), -%% -%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>, -%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1}, -%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>, -%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2}, -%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2], -%% Shards = -%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), -%% Counters = dict:to_list( -%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)), -%% DocsDict = dict:new(), -%% -%% % *** test rexi_exit on 2 nodes -%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"), -%% Counters, DocsDict}, -%% {ok, {WaitingCount1,_,_,_,_} = Acc1} = -%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0), -%% ?assertEqual(2, WaitingCount1), -%% {ok, {WaitingCount2,_,_,_,_} = Acc2} = -%% handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1), -%% ?assertEqual(1, WaitingCount2), -%% {stop, Reply} = -%% handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2), -%% ?assertEqual( -%% {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]}, -%% Reply -%% ), -%% meck:unload(couch_log). -%% -%% -%% doc_purge_error_test() -> -%% meck:new(couch_log), -%% meck:expect(couch_log, warning, fun(_,_) -> ok end), -%% meck:expect(couch_log, notice, fun(_,_) -> ok end), -%% -%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>, -%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1}, -%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>, -%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2}, -%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2], -%% Shards = -%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), -%% Counters = dict:to_list( -%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)), -%% DocsDict = dict:new(), -%% -%% % *** test rexi_exit on all 3 nodes -%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"), -%% Counters, DocsDict}, -%% {ok, {WaitingCount1,_,_,_,_} = Acc1} = -%% handle_message({rexi_EXIT, nil}, hd(Shards), Acc0), -%% ?assertEqual(2, WaitingCount1), -%% {ok, {WaitingCount2,_,_,_,_} = Acc2} = -%% handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1), -%% ?assertEqual(1, WaitingCount2), -%% {stop, Reply} = -%% handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2), -%% ?assertEqual( -%% {error, [{UUID1, {error, internal_server_error}}, -%% {UUID2, {error, internal_server_error}}]}, -%% Reply -%% ), -%% -%% % ***test w quorum > # shards, which should fail immediately -%% Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]), -%% Counters2 = dict:to_list( -%% group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)), -%% AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"), -%% Counters2, DocsDict}, -%% Bool = -%% case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, -%% hd(Shards), AccW4) of -%% {stop, _Reply} -> -%% true; -%% _ -> false -%% end, -%% ?assertEqual(true, Bool), -%% -%% % *** test Docs with no replies should end up as {error, internal_server_error} -%% SA1 = #shard{node = a, range = [1]}, -%% SA2 = #shard{node = a, range = [2]}, -%% SB1 = #shard{node = b, range = [1]}, -%% SB2 = #shard{node = b, range = [2]}, -%% Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]}, -%% {SA2,[UUID2]}, {SB2,[UUID2]}], -%% Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict}, -%% {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30), -%% {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31), -%% {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32), -%% {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33), -%% ?assertEqual( -%% {error, [{UUID1, {accepted, Revs1}}, -%% {UUID2, {error, internal_server_error}}]}, -%% Acc34 -%% ), -%% meck:unload(couch_log). -%% -%% -%% % needed for testing to avoid having to start the mem3 application -%% group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) -> -%% lists:foldl(fun({UUID, _Id, _Revs}, Dict0) -> -%% lists:foldl(fun(Shard, Dict1) -> -%% dict:append(Shard, UUID, Dict1) -%% end, Dict0, Shards) -%% end, dict:new(), UUIDsIdsRevs). +resp_health(Resps) -> + Healths = lists:usort([H || {H, _} <- Resps]), + HasError = lists:member(error, Healths), + HasAccepted = lists:member(accepted, Healths), + AllOk = Healths == [ok], + if + HasError -> error; + HasAccepted -> accepted; + AllOk -> ok; + true -> error + end. + + +has_quorum(Resps, Count, W) -> + OkResps = [R || {ok, _} = R <- Resps], + OkCounts = lists:foldl(fun(R, Acc) -> + dict:update_counter(R, 1, Acc) + end, dict:new(), OkResps), + MaxOk = lists:max([0 | element(2, lists:unzip(dict:to_list(OkCounts)))]), + if + MaxOk >= W -> true; + length(Resps) >= Count -> true; + true -> false + end. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +purge_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_w2_ok(), + t_w3_ok(), + + t_w2_mixed_accepted(), + t_w3_mixed_accepted(), + + t_w2_exit1_ok(), + t_w2_exit2_accepted(), + t_w2_exit3_error(), + + t_w4_accepted(), + + t_mixed_ok_accepted(), + t_mixed_errors() + ] + }. + + +setup() -> + meck:new(couch_log), + meck:expect(couch_log, warning, fun(_, _) -> ok end), + meck:expect(couch_log, notice, fun(_, _) -> ok end). + + +teardown(_) -> + meck:unload(). + + +t_w2_ok() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w3_ok() -> + ?_test(begin + Acc0 = create_init_acc(3), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w2_mixed_accepted() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w3_mixed_accepted() -> + ?_test(begin + Acc0 = create_init_acc(3), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w2_exit1_ok() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w2_exit2_accepted() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w2_exit3_error() -> + ?_test(begin + Acc0 = create_init_acc(2), + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {error, internal_server_error}, + {error, internal_server_error} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)) + end). + + +t_w4_accepted() -> + % Make sure we return when all workers have responded + % rather than wait around for a timeout if a user asks + % for a qourum with more than the available number of + % shards. + ?_test(begin + Acc0 = create_init_acc(4), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_mixed_ok_accepted() -> + ?_test(begin + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]}, + Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_mixed_errors() -> + ?_test(begin + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)) + end). + + +create_init_acc(W) -> + UUID1 = <<"uuid1">>, + UUID2 = <<"uuid2">>, + + Nodes = [node1, node2, node3], + Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes), + + % Create our worker_uuids. We're relying on the fact that + % we're using a fake Q=1 db so we don't have to worry + % about any hashing here. + WorkerUUIDs = lists:map(fun(Shard) -> + {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]} + end, Shards), + + #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{UUID1, []}, {UUID2, []}]), + uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]), + w = W + }. + + +worker(N, #acc{worker_uuids = WorkerUUIDs}) -> + {Worker, _} = lists:nth(N, WorkerUUIDs), + Worker. + + +check_quorum(Acc, Expect) -> + dict:fold(fun(_Shard, Resps, _) -> + ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w)) + end, nil, Acc#acc.resps). + +-endif. |