summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-30 13:05:33 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-30 13:23:05 -0500
commit97d836e57f6161c0a99a7e910ec00d78ee0823ed (patch)
tree4ae117ea3c7cf96c357380b60e7cefbc299254c7
parenta252e384e33401c7fc50d1d9798756b8c7fdd4f6 (diff)
downloadcouchdb-97d836e57f6161c0a99a7e910ec00d78ee0823ed.tar.gz
[SQUERGE] Fix tests and code for fabric_doc_purge
-rw-r--r--src/fabric/src/fabric_doc_purge.erl629
1 files changed, 394 insertions, 235 deletions
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
index b51f28eec..61b84d81b 100644
--- a/src/fabric/src/fabric_doc_purge.erl
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -24,8 +24,8 @@
-record(acc, {
worker_uuids,
- req_count,
resps,
+ uuid_counts,
w
}).
@@ -34,7 +34,7 @@ go(_, [], _) ->
{ok, []};
go(DbName, IdsRevs, Options) ->
% Generate our purge requests of {UUID, DocId, Revs}
- {UUIDs, Reqs, Count} = create_reqs(IdsRevs, [], [], 0),
+ {UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
% Fire off rexi workers for each shard.
{Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
@@ -46,12 +46,18 @@ go(DbName, IdsRevs, Options) ->
{[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
+ UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) ->
+ lists:foldl(fun(UUID, InnerCountAcc) ->
+ dict:update_counter(UUID, 1, InnerCountAcc)
+ end, CountAcc, WUUIDs)
+ end, dict:new(), WorkerUUIDs),
+
RexiMon = fabric_util:create_monitors(Workers),
Timeout = fabric_util:request_timeout(),
Acc0 = #acc{
worker_uuids = WorkerUUIDs,
- req_count = Count,
resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+ uuid_counts = UUIDCounts,
w = w(DbName, Options)
},
Acc2 = try rexi_utils:recv(Workers, #shard.ref,
@@ -73,7 +79,8 @@ go(DbName, IdsRevs, Options) ->
rexi_monitor:stop(RexiMon)
end,
- {ok, format_resps(UUIDs, Acc2)}.
+ FinalResps = format_resps(UUIDs, Acc2),
+ {resp_health(FinalResps), FinalResps}.
handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
@@ -101,7 +108,6 @@ handle_message({ok, Replies}, Worker, Acc) ->
resps = Resps
} = Acc,
{value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
- couch_log:error("XKCD: ~p ~p :: ~p ~p", [length(UUIDs), length(Replies), UUIDs, Replies]),
NewResps = append_resps(UUIDs, Replies, Resps),
maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
@@ -109,14 +115,14 @@ handle_message({bad_request, Msg}, _, _) ->
throw({bad_request, Msg}).
-create_reqs([], UUIDs, Reqs, Count) ->
- {lists:reverse(UUIDs), lists:reverse(Reqs), Count};
+create_reqs([], UUIDs, Reqs) ->
+ {lists:reverse(UUIDs), lists:reverse(Reqs)};
-create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs, Count) ->
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
UUID = couch_uuids:new(),
NewUUIDs = [UUID | UUIDs],
NewReqs = [{UUID, Id, Revs} | Reqs],
- create_reqs(RestIdsRevs, NewUUIDs, NewReqs, Count + 1).
+ create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
group_reqs_by_shard(DbName, Reqs) ->
@@ -151,11 +157,11 @@ append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
maybe_stop(#acc{worker_uuids = []} = Acc) ->
{stop, Acc};
-maybe_stop(#acc{resps = Resps, w = W} = Acc) ->
+maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
try
- dict:fold(fun(_UUID, UUIDResps, _) ->
- couch_log:error("XKCD: ~p ~p", [UUIDResps, W]),
- case has_quorum(UUIDResps, W) of
+ dict:fold(fun(UUID, UUIDResps, _) ->
+ UUIDCount = dict:fetch(UUID, Counts),
+ case has_quorum(UUIDResps, UUIDCount, W) of
true -> ok;
false -> throw(keep_going)
end
@@ -179,7 +185,9 @@ format_resps(UUIDs, #acc{} = Acc) ->
[{UUID, Error} | ReplyAcc];
_ ->
AllRevs = lists:usort(lists:flatten(OkReplies)),
- Health = if length(OkReplies) >= W -> ok; true -> accepted end,
+ IsOk = length(OkReplies) >= W
+ andalso length(lists:usort(OkReplies)) == 1,
+ Health = if IsOk -> ok; true -> accepted end,
[{UUID, {Health, AllRevs}} | ReplyAcc]
end
end,
@@ -190,224 +198,375 @@ format_resps(_UUIDs, Else) ->
Else.
-has_quorum([], W) when W > 0 ->
- false;
-has_quorum(_, W) when W =< 0 ->
- true;
-has_quorum([{ok, _} | Rest], W) when W > 0 ->
- has_quorum(Rest, W - 1).
-
-
-%% % eunits
-%% doc_purge_ok_test() ->
-%% meck:new(couch_log),
-%% meck:expect(couch_log, warning, fun(_,_) -> ok end),
-%% meck:expect(couch_log, notice, fun(_,_) -> ok end),
-%%
-%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
-%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
-%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
-%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
-%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
-%% Shards =
-%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
-%% Counters = dict:to_list(
-%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
-%% DocsDict = dict:new(),
-%%
-%% % ***test for W = 2
-%% AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
-%% handle_message({ok,[{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW2),
-%% ?assertEqual(2, WaitingCountW2_1),
-%% {stop, FinalReplyW2 } =
-%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-%% lists:nth(2,Shards), AccW2_1),
-%% ?assertEqual(
-%% {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
-%% FinalReplyW2
-%% ),
-%%
-%% % ***test for W = 3
-%% AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
-%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), AccW3),
-%% ?assertEqual(2, WaitingCountW3_1),
-%% {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
-%% handle_message({ok,[{ok, Revs1}, {ok, Revs2}]},
-%% lists:nth(2,Shards), AccW3_1),
-%% ?assertEqual(1, WaitingCountW3_2),
-%% {stop, FinalReplyW3 } =
-%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-%% lists:nth(3,Shards), AccW3_2),
-%% ?assertEqual(
-%% {ok, [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
-%% FinalReplyW3
-%% ),
-%%
-%% % *** test rexi_exit on 1 node
-%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCount1,_,_,_,_} = Acc1} =
-%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
-%% ?assertEqual(2, WaitingCount1),
-%% {ok, {WaitingCount2,_,_,_,_} = Acc2} =
-%% handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
-%% ?assertEqual(1, WaitingCount2),
-%% {stop, Reply} =
-%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-%% lists:nth(3,Shards), Acc2),
-%% ?assertEqual(
-%% {ok,[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}]},
-%% Reply
-%% ),
-%%
-%% % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
-%% % *** still should return ok reply for the request
-%% ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
-%% Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCount21,_,_,_,_} = Acc21} =
-%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, hd(Shards), Acc20),
-%% ?assertEqual(2, WaitingCount21),
-%% {ok, {WaitingCount22,_,_,_,_} = Acc22} =
-%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(2,Shards), Acc21),
-%% ?assertEqual(1, WaitingCount22),
-%% {stop, Reply2 } =
-%% handle_message({ok, [ErrPDCEL, ErrPDCEL]}, lists:nth(3,Shards), Acc22),
-%% ?assertEqual(
-%% {ok, [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}]},
-%% Reply2
-%% ),
-%%
-%% % *** test {error, purged_docs_limit_exceeded} on all nodes
-%% % *** still should return ok reply for the request
-%% ErrPDLE = {error, purged_docs_limit_exceeded},
-%% Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCount31,_,_,_,_} = Acc31} =
-%% handle_message({ok, [ErrPDLE, ErrPDLE]}, hd(Shards), Acc30),
-%% ?assertEqual(2, WaitingCount31),
-%% {ok, {WaitingCount32,_,_,_,_} = Acc32} =
-%% handle_message({ok, [ErrPDLE, ErrPDLE]}, lists:nth(2,Shards), Acc31),
-%% ?assertEqual(1, WaitingCount32),
-%% {stop, Reply3 } =
-%% handle_message({ok, [ErrPDLE, ErrPDLE]},lists:nth(3,Shards), Acc32),
-%% ?assertEqual(
-%% {ok, [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}]},
-%% Reply3
-%% ),
-%% meck:unload(couch_log).
-%%
-%%
-%% doc_purge_accepted_test() ->
-%% meck:new(couch_log),
-%% meck:expect(couch_log, warning, fun(_,_) -> ok end),
-%% meck:expect(couch_log, notice, fun(_,_) -> ok end),
-%%
-%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
-%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
-%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
-%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
-%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
-%% Shards =
-%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
-%% Counters = dict:to_list(
-%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
-%% DocsDict = dict:new(),
-%%
-%% % *** test rexi_exit on 2 nodes
-%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCount1,_,_,_,_} = Acc1} =
-%% handle_message({ok, [{ok, Revs1}, {ok, Revs2}]}, hd(Shards), Acc0),
-%% ?assertEqual(2, WaitingCount1),
-%% {ok, {WaitingCount2,_,_,_,_} = Acc2} =
-%% handle_message({rexi_EXIT, nil}, lists:nth(2, Shards), Acc1),
-%% ?assertEqual(1, WaitingCount2),
-%% {stop, Reply} =
-%% handle_message({rexi_EXIT, nil}, lists:nth(3, Shards), Acc2),
-%% ?assertEqual(
-%% {accepted, [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}]},
-%% Reply
-%% ),
-%% meck:unload(couch_log).
-%%
-%%
-%% doc_purge_error_test() ->
-%% meck:new(couch_log),
-%% meck:expect(couch_log, warning, fun(_,_) -> ok end),
-%% meck:expect(couch_log, notice, fun(_,_) -> ok end),
-%%
-%% Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
-%% UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
-%% Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
-%% UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
-%% UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
-%% Shards =
-%% mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
-%% Counters = dict:to_list(
-%% group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
-%% DocsDict = dict:new(),
-%%
-%% % *** test rexi_exit on all 3 nodes
-%% Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
-%% Counters, DocsDict},
-%% {ok, {WaitingCount1,_,_,_,_} = Acc1} =
-%% handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
-%% ?assertEqual(2, WaitingCount1),
-%% {ok, {WaitingCount2,_,_,_,_} = Acc2} =
-%% handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
-%% ?assertEqual(1, WaitingCount2),
-%% {stop, Reply} =
-%% handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
-%% ?assertEqual(
-%% {error, [{UUID1, {error, internal_server_error}},
-%% {UUID2, {error, internal_server_error}}]},
-%% Reply
-%% ),
-%%
-%% % ***test w quorum > # shards, which should fail immediately
-%% Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
-%% Counters2 = dict:to_list(
-%% group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
-%% AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
-%% Counters2, DocsDict},
-%% Bool =
-%% case handle_message({ok, [{ok, Revs1}, {ok, Revs2}]},
-%% hd(Shards), AccW4) of
-%% {stop, _Reply} ->
-%% true;
-%% _ -> false
-%% end,
-%% ?assertEqual(true, Bool),
-%%
-%% % *** test Docs with no replies should end up as {error, internal_server_error}
-%% SA1 = #shard{node = a, range = [1]},
-%% SA2 = #shard{node = a, range = [2]},
-%% SB1 = #shard{node = b, range = [1]},
-%% SB2 = #shard{node = b, range = [2]},
-%% Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
-%% {SA2,[UUID2]}, {SB2,[UUID2]}],
-%% Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2, Counters3, DocsDict},
-%% {ok, Acc31} = handle_message({ok, [{ok, Revs1}]}, SA1, Acc30),
-%% {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
-%% {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
-%% {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
-%% ?assertEqual(
-%% {error, [{UUID1, {accepted, Revs1}},
-%% {UUID2, {error, internal_server_error}}]},
-%% Acc34
-%% ),
-%% meck:unload(couch_log).
-%%
-%%
-%% % needed for testing to avoid having to start the mem3 application
-%% group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
-%% lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
-%% lists:foldl(fun(Shard, Dict1) ->
-%% dict:append(Shard, UUID, Dict1)
-%% end, Dict0, Shards)
-%% end, dict:new(), UUIDsIdsRevs).
+resp_health(Resps) ->
+ Healths = lists:usort([H || {H, _} <- Resps]),
+ HasError = lists:member(error, Healths),
+ HasAccepted = lists:member(accepted, Healths),
+ AllOk = Healths == [ok],
+ if
+ HasError -> error;
+ HasAccepted -> accepted;
+ AllOk -> ok;
+ true -> error
+ end.
+
+
+has_quorum(Resps, Count, W) ->
+ OkResps = [R || {ok, _} = R <- Resps],
+ OkCounts = lists:foldl(fun(R, Acc) ->
+ dict:update_counter(R, 1, Acc)
+ end, dict:new(), OkResps),
+ MaxOk = lists:max([0 | element(2, lists:unzip(dict:to_list(OkCounts)))]),
+ if
+ MaxOk >= W -> true;
+ length(Resps) >= Count -> true;
+ true -> false
+ end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+purge_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_w2_ok(),
+ t_w3_ok(),
+
+ t_w2_mixed_accepted(),
+ t_w3_mixed_accepted(),
+
+ t_w2_exit1_ok(),
+ t_w2_exit2_accepted(),
+ t_w2_exit3_error(),
+
+ t_w4_accepted(),
+
+ t_mixed_ok_accepted(),
+ t_mixed_errors()
+ ]
+ }.
+
+
+setup() ->
+ meck:new(couch_log),
+ meck:expect(couch_log, warning, fun(_, _) -> ok end),
+ meck:expect(couch_log, notice, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+ meck:unload().
+
+
+t_w2_ok() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, true),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(ok, resp_health(Resps))
+ end).
+
+
+t_w3_ok() ->
+ ?_test(begin
+ Acc0 = create_init_acc(3),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(ok, resp_health(Resps))
+ end).
+
+
+t_w2_mixed_accepted() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+ Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [
+ {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+ {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+ ],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_w3_mixed_accepted() ->
+ ?_test(begin
+ Acc0 = create_init_acc(3),
+ Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+ Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [
+ {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+ {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+ ],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_w2_exit1_ok() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(ok, resp_health(Resps))
+ end).
+
+
+t_w2_exit2_accepted() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_w2_exit3_error() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [
+ {error, internal_server_error},
+ {error, internal_server_error}
+ ],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(error, resp_health(Resps))
+ end).
+
+
+t_w4_accepted() ->
+ % Make sure we return when all workers have responded
+ % rather than wait around for a timeout if a user asks
+ % for a qourum with more than the available number of
+ % shards.
+ ?_test(begin
+ Acc0 = create_init_acc(4),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_mixed_ok_accepted() ->
+ ?_test(begin
+ WorkerUUIDs = [
+ {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+ {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+ ],
+
+ Acc0 = #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+ uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+ w = 2
+ },
+
+ Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]},
+ Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+ {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
+ {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+ {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+ {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_mixed_errors() ->
+ ?_test(begin
+ WorkerUUIDs = [
+ {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+ {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+ ],
+
+ Acc0 = #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+ uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+ w = 2
+ },
+
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+ {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+ {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(error, resp_health(Resps))
+ end).
+
+
+create_init_acc(W) ->
+ UUID1 = <<"uuid1">>,
+ UUID2 = <<"uuid2">>,
+
+ Nodes = [node1, node2, node3],
+ Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes),
+
+ % Create our worker_uuids. We're relying on the fact that
+ % we're using a fake Q=1 db so we don't have to worry
+ % about any hashing here.
+ WorkerUUIDs = lists:map(fun(Shard) ->
+ {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]}
+ end, Shards),
+
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
+ uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
+ w = W
+ }.
+
+
+worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
+ {Worker, _} = lists:nth(N, WorkerUUIDs),
+ Worker.
+
+
+check_quorum(Acc, Expect) ->
+ dict:fold(fun(_Shard, Resps, _) ->
+ ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
+ end, nil, Acc#acc.resps).
+
+-endif.