diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-04-24 12:27:43 -0500 |
---|---|---|
committer | jiangph <jiangph@cn.ibm.com> | 2018-08-22 00:59:16 +0800 |
commit | 932dd93b0e6edaa5ba2eb3775925301e950657ee (patch) | |
tree | e4316afdb45c1bd34a84870d6d6a206e4deb318c | |
parent | 0234679661383fd71dd7d15a51d4454d138f0e90 (diff) | |
download | couchdb-932dd93b0e6edaa5ba2eb3775925301e950657ee.tar.gz |
[09/10] Clustered Purge: Fabric API
This commit implements the clustered API for performing purge requests.
This change should be a fairly straightforward change for anyone already
familiar with the general implementation of a fabric coordinator given
that the purge API is fairly simple.
COUCHDB-3326
Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com>
Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r-- | src/fabric/src/fabric.erl | 27 | ||||
-rw-r--r-- | src/fabric/src/fabric_db_info.erl | 29 | ||||
-rw-r--r-- | src/fabric/src/fabric_db_meta.erl | 26 | ||||
-rw-r--r-- | src/fabric/src/fabric_doc_purge.erl | 572 |
4 files changed, 638 insertions, 16 deletions
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 5ad9b459d..cf9ad9428 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -21,12 +21,13 @@ delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, set_security/2, set_security/3, get_revs_limit/1, get_security/1, get_security/2, get_all_security/1, get_all_security/2, + get_purge_infos_limit/1, set_purge_infos_limit/3, compact/1, compact/2]). % Documents -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3, get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3, - purge_docs/2, att_receiver/2]). + purge_docs/3, att_receiver/2]). % Views -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4, @@ -137,6 +138,18 @@ set_security(DbName, SecObj) -> set_security(DbName, SecObj, Options) -> fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). +%% @doc sets the upper bound for the number of stored purge requests +-spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok. +set_purge_infos_limit(DbName, Limit, Options) + when is_integer(Limit), Limit > 0 -> + fabric_db_meta:set_purge_infos_limit(dbname(DbName), Limit, opts(Options)). + +%% @doc retrieves the upper bound for the number of stored purge requests +-spec get_purge_infos_limit(dbname()) -> pos_integer() | no_return(). +get_purge_infos_limit(DbName) -> + {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]), + try couch_db:get_purge_infos_limit(Db) after catch couch_db:close(Db) end. + get_security(DbName) -> get_security(DbName, [?ADMIN_CTX]). @@ -271,8 +284,16 @@ update_docs(DbName, Docs, Options) -> {aborted, PreCommitFailures} end. -purge_docs(_DbName, _IdsRevs) -> - not_implemented. + +%% @doc purge revisions for a list '{Id, Revs}' +%% returns {ok, {PurgeSeq, Results}} +-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) -> + {ok, [{Health, [revision()]}] | {error, any()}} when + Health :: ok | accepted. +purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) -> + IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs], + fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)). + %% @doc spawns a process to upload attachment data and %% returns a fabric attachment receiver context tuple diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index 98e8e52e4..97a31c237 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -23,10 +23,12 @@ go(DbName) -> RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, {ok, ClusterInfo} = get_cluster_info(Shards), - Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]}, + Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]}, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of - {ok, Acc} -> {ok, Acc}; + + {ok, Acc} -> + {ok, Acc}; {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers( WorkersDict, @@ -37,44 +39,49 @@ go(DbName) -> "get_db_info" ), {error, timeout}; - {error, Error} -> throw(Error) + {error, Error} -> + throw(Error) end after rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> +handle_message({rexi_DOWN, + _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) -> case fabric_util:remove_down_workers(Counters, NodeRef) of {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; + {ok, {NewCounters, PseqAcc, Acc}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> +handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) -> NewCounters = fabric_dict:erase(Shard, Counters), case fabric_view:is_progress_possible(NewCounters) of true -> - {ok, {NewCounters, Acc}}; + {ok, {NewCounters, PseqAcc, Acc}}; false -> {error, Reason} end; -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> +handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) -> case fabric_dict:lookup_element(Shard, Counters) of undefined -> % already heard from someone else in this range - {ok, {Counters, Acc}}; + {ok, {Counters, PseqAcc, Acc}}; nil -> Seq = couch_util:get_value(update_seq, Info), C1 = fabric_dict:store(Shard, Seq, Counters), C2 = fabric_view:remove_overlapping_shards(Shard, C1), + PSeq = couch_util:get_value(purge_seq, Info), + NewPseqAcc = [{Shard, PSeq}|PseqAcc], case fabric_dict:any(nil, C2) of true -> - {ok, {C2, [Info|Acc]}}; + {ok, {C2, NewPseqAcc, [Info|Acc]}}; false -> {stop, [ {db_name,Name}, + {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)}, {update_seq, fabric_view_changes:pack_seqs(C2)} | merge_results(lists:flatten([Info|Acc])) ]} @@ -91,8 +98,6 @@ merge_results(Info) -> [{doc_count, lists:sum(X)} | Acc]; (doc_del_count, X, Acc) -> [{doc_del_count, lists:sum(X)} | Acc]; - (purge_seq, X, Acc) -> - [{purge_seq, lists:sum(X)} | Acc]; (compact_running, X, Acc) -> [{compact_running, lists:member(true, X)} | Acc]; (disk_size, X, Acc) -> % legacy diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl index 367ef06e9..26e1b3752 100644 --- a/src/fabric/src/fabric_db_meta.erl +++ b/src/fabric/src/fabric_db_meta.erl @@ -12,7 +12,8 @@ -module(fabric_db_meta). --export([set_revs_limit/3, set_security/3, get_all_security/2]). +-export([set_revs_limit/3, set_security/3, get_all_security/2, + set_purge_infos_limit/3]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) -> {error, Error}. +set_purge_infos_limit(DbName, Limit, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]), + Handler = fun handle_purge_message/3, + Acc0 = {Workers, length(Workers) - 1}, + case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of + {ok, ok} -> + ok; + {timeout, {DefunctWorkers, _}} -> + fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"), + {error, timeout}; + Error -> + Error + end. + +handle_purge_message(ok, _, {_Workers, 0}) -> + {stop, ok}; +handle_purge_message(ok, Worker, {Workers, Waiting}) -> + {ok, {lists:delete(Worker, Workers), Waiting - 1}}; +handle_purge_message(Error, _, _Acc) -> + {error, Error}. + + set_security(DbName, SecObj, Options) -> Shards = mem3:shards(DbName), RexiMon = fabric_util:create_monitors(Shards), diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl new file mode 100644 index 000000000..2571d0d7f --- /dev/null +++ b/src/fabric/src/fabric_doc_purge.erl @@ -0,0 +1,572 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_purge). + + +-export([ + go/3 +]). + + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +-record(acc, { + worker_uuids, + resps, + uuid_counts, + w +}). + + +go(_, [], _) -> + {ok, []}; +go(DbName, IdsRevs, Options) -> + % Generate our purge requests of {UUID, DocId, Revs} + {UUIDs, Reqs} = create_reqs(IdsRevs, [], []), + + % Fire off rexi workers for each shard. + {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) -> + #shard{name = ShardDbName, node = Node} = Shard, + Args = [ShardDbName, ShardReqs, Options], + Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}), + Worker = Shard#shard{ref=Ref}, + ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs], + {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]} + end, {[], []}, group_reqs_by_shard(DbName, Reqs)), + + UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) -> + lists:foldl(fun(UUID, InnerCountAcc) -> + dict:update_counter(UUID, 1, InnerCountAcc) + end, CountAcc, WUUIDs) + end, dict:new(), WorkerUUIDs), + + RexiMon = fabric_util:create_monitors(Workers), + Timeout = fabric_util:request_timeout(), + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{UUID, []} || UUID <- UUIDs]), + uuid_counts = UUIDCounts, + w = w(DbName, Options) + }, + Acc2 = try rexi_utils:recv(Workers, #shard.ref, + fun handle_message/3, Acc0, infinity, Timeout) of + {ok, Acc1} -> + Acc1; + {timeout, Acc1} -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc1, + DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs], + fabric_util:log_timeout(DefunctWorkers, "purge_docs"), + NewResps = append_errors(timeout, WorkerUUIDs, Resps), + Acc1#acc{worker_uuids = [], resps = NewResps}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end, + + FinalResps = format_resps(UUIDs, Acc2), + {resp_health(FinalResps), FinalResps}. + + +handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc, + Pred = fun({#shard{node = N}, _}) -> N == Node end, + {Failed, Rest} = lists:partition(Pred, WorkerUUIDs), + NewResps = append_errors(internal_server_error, Failed, Resps), + maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + +handle_message({rexi_EXIT, _}, Worker, Acc) -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc, + {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), + NewResps = append_errors(internal_server_error, [WorkerPair], Resps), + maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + +handle_message({ok, Replies}, Worker, Acc) -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc, + {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), + NewResps = append_resps(UUIDs, Replies, Resps), + maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + +handle_message({bad_request, Msg}, _, _) -> + throw({bad_request, Msg}). + + +create_reqs([], UUIDs, Reqs) -> + {lists:reverse(UUIDs), lists:reverse(Reqs)}; + +create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) -> + UUID = couch_uuids:new(), + NewUUIDs = [UUID | UUIDs], + NewReqs = [{UUID, Id, Revs} | Reqs], + create_reqs(RestIdsRevs, NewUUIDs, NewReqs). + + +group_reqs_by_shard(DbName, Reqs) -> + lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Req, D1) + end, D0, mem3:shards(DbName, Id)) + end, dict:new(), Reqs). + + +w(DbName, Options) -> + try + list_to_integer(couch_util:get_value(w, Options)) + catch _:_ -> + mem3:quorum(DbName) + end. + + +append_errors(Type, WorkerUUIDs, Resps) -> + lists:foldl(fun({_Worker, UUIDs}, RespAcc) -> + Errors = [{error, Type} || _UUID <- UUIDs], + append_resps(UUIDs, Errors, RespAcc) + end, Resps, WorkerUUIDs). + + +append_resps([], [], Resps) -> + Resps; +append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) -> + NewResps = dict:append(UUID, Reply, Resps), + append_resps(RestUUIDs, RestReplies, NewResps). + + +maybe_stop(#acc{worker_uuids = []} = Acc) -> + {stop, Acc}; +maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) -> + try + dict:fold(fun(UUID, UUIDResps, _) -> + UUIDCount = dict:fetch(UUID, Counts), + case has_quorum(UUIDResps, UUIDCount, W) of + true -> ok; + false -> throw(keep_going) + end + end, nil, Resps), + {stop, Acc} + catch throw:keep_going -> + {ok, Acc} + end. + + +format_resps(UUIDs, #acc{} = Acc) -> + #acc{ + resps = Resps, + w = W + } = Acc, + FoldFun = fun(UUID, Replies, ReplyAcc) -> + OkReplies = [Reply || {ok, Reply} <- Replies], + case OkReplies of + [] -> + [Error | _] = lists:usort(Replies), + [{UUID, Error} | ReplyAcc]; + _ -> + AllRevs = lists:usort(lists:flatten(OkReplies)), + IsOk = length(OkReplies) >= W + andalso length(lists:usort(OkReplies)) == 1, + Health = if IsOk -> ok; true -> accepted end, + [{UUID, {Health, AllRevs}} | ReplyAcc] + end + end, + FinalReplies = dict:fold(FoldFun, {ok, []}, Resps), + couch_util:reorder_results(UUIDs, FinalReplies); + +format_resps(_UUIDs, Else) -> + Else. + + +resp_health(Resps) -> + Healths = lists:usort([H || {H, _} <- Resps]), + HasError = lists:member(error, Healths), + HasAccepted = lists:member(accepted, Healths), + AllOk = Healths == [ok], + if + HasError -> error; + HasAccepted -> accepted; + AllOk -> ok; + true -> error + end. + + +has_quorum(Resps, Count, W) -> + OkResps = [R || {ok, _} = R <- Resps], + OkCounts = lists:foldl(fun(R, Acc) -> + orddict:update_counter(R, 1, Acc) + end, orddict:new(), OkResps), + MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]), + if + MaxOk >= W -> true; + length(Resps) >= Count -> true; + true -> false + end. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +purge_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_w2_ok(), + t_w3_ok(), + + t_w2_mixed_accepted(), + t_w3_mixed_accepted(), + + t_w2_exit1_ok(), + t_w2_exit2_accepted(), + t_w2_exit3_error(), + + t_w4_accepted(), + + t_mixed_ok_accepted(), + t_mixed_errors() + ] + }. + + +setup() -> + meck:new(couch_log), + meck:expect(couch_log, warning, fun(_, _) -> ok end), + meck:expect(couch_log, notice, fun(_, _) -> ok end). + + +teardown(_) -> + meck:unload(). + + +t_w2_ok() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w3_ok() -> + ?_test(begin + Acc0 = create_init_acc(3), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w2_mixed_accepted() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w3_mixed_accepted() -> + ?_test(begin + Acc0 = create_init_acc(3), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w2_exit1_ok() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w2_exit2_accepted() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w2_exit3_error() -> + ?_test(begin + Acc0 = create_init_acc(2), + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {error, internal_server_error}, + {error, internal_server_error} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)) + end). + + +t_w4_accepted() -> + % Make sure we return when all workers have responded + % rather than wait around for a timeout if a user asks + % for a qourum with more than the available number of + % shards. + ?_test(begin + Acc0 = create_init_acc(4), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_mixed_ok_accepted() -> + ?_test(begin + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]}, + Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_mixed_errors() -> + ?_test(begin + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)) + end). + + +create_init_acc(W) -> + UUID1 = <<"uuid1">>, + UUID2 = <<"uuid2">>, + + Nodes = [node1, node2, node3], + Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes), + + % Create our worker_uuids. We're relying on the fact that + % we're using a fake Q=1 db so we don't have to worry + % about any hashing here. + WorkerUUIDs = lists:map(fun(Shard) -> + {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]} + end, Shards), + + #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{UUID1, []}, {UUID2, []}]), + uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]), + w = W + }. + + +worker(N, #acc{worker_uuids = WorkerUUIDs}) -> + {Worker, _} = lists:nth(N, WorkerUUIDs), + Worker. + + +check_quorum(Acc, Expect) -> + dict:fold(fun(_Shard, Resps, _) -> + ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w)) + end, nil, Acc#acc.resps). + +-endif. |