summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-24 12:27:43 -0500
committerjiangph <jiangph@cn.ibm.com>2018-08-22 00:59:16 +0800
commit932dd93b0e6edaa5ba2eb3775925301e950657ee (patch)
treee4316afdb45c1bd34a84870d6d6a206e4deb318c
parent0234679661383fd71dd7d15a51d4454d138f0e90 (diff)
downloadcouchdb-932dd93b0e6edaa5ba2eb3775925301e950657ee.tar.gz
[09/10] Clustered Purge: Fabric API
This commit implements the clustered API for performing purge requests. This change should be a fairly straightforward change for anyone already familiar with the general implementation of a fabric coordinator given that the purge API is fairly simple. COUCHDB-3326 Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com> Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r--src/fabric/src/fabric.erl27
-rw-r--r--src/fabric/src/fabric_db_info.erl29
-rw-r--r--src/fabric/src/fabric_db_meta.erl26
-rw-r--r--src/fabric/src/fabric_doc_purge.erl572
4 files changed, 638 insertions, 16 deletions
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 5ad9b459d..cf9ad9428 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -21,12 +21,13 @@
delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
set_security/2, set_security/3, get_revs_limit/1, get_security/1,
get_security/2, get_all_security/1, get_all_security/2,
+ get_purge_infos_limit/1, set_purge_infos_limit/3,
compact/1, compact/2]).
% Documents
-export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
- purge_docs/2, att_receiver/2]).
+ purge_docs/3, att_receiver/2]).
% Views
-export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -137,6 +138,18 @@ set_security(DbName, SecObj) ->
set_security(DbName, SecObj, Options) ->
fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purge_infos_limit(DbName, Limit, Options)
+ when is_integer(Limit), Limit > 0 ->
+ fabric_db_meta:set_purge_infos_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purge_infos_limit(dbname()) -> pos_integer() | no_return().
+get_purge_infos_limit(DbName) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+ try couch_db:get_purge_infos_limit(Db) after catch couch_db:close(Db) end.
+
get_security(DbName) ->
get_security(DbName, [?ADMIN_CTX]).
@@ -271,8 +284,16 @@ update_docs(DbName, Docs, Options) ->
{aborted, PreCommitFailures}
end.
-purge_docs(_DbName, _IdsRevs) ->
- not_implemented.
+
+%% @doc purge revisions for a list '{Id, Revs}'
+%% returns {ok, {PurgeSeq, Results}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+ {ok, [{Health, [revision()]}] | {error, any()}} when
+ Health :: ok | accepted.
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+ IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+ fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)).
+
%% @doc spawns a process to upload attachment data and
%% returns a fabric attachment receiver context tuple
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index 98e8e52e4..97a31c237 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -23,10 +23,12 @@ go(DbName) ->
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
{ok, ClusterInfo} = get_cluster_info(Shards),
- Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
+ Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]},
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
- {ok, Acc} -> {ok, Acc};
+
+ {ok, Acc} ->
+ {ok, Acc};
{timeout, {WorkersDict, _}} ->
DefunctWorkers = fabric_util:remove_done_workers(
WorkersDict,
@@ -37,44 +39,49 @@ go(DbName) ->
"get_db_info"
),
{error, timeout};
- {error, Error} -> throw(Error)
+ {error, Error} ->
+ throw(Error)
end
after
rexi_monitor:stop(RexiMon)
end.
-handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+handle_message({rexi_DOWN,
+ _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) ->
case fabric_util:remove_down_workers(Counters, NodeRef) of
{ok, NewCounters} ->
- {ok, {NewCounters, Acc}};
+ {ok, {NewCounters, PseqAcc, Acc}};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
-handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) ->
NewCounters = fabric_dict:erase(Shard, Counters),
case fabric_view:is_progress_possible(NewCounters) of
true ->
- {ok, {NewCounters, Acc}};
+ {ok, {NewCounters, PseqAcc, Acc}};
false ->
{error, Reason}
end;
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) ->
case fabric_dict:lookup_element(Shard, Counters) of
undefined ->
% already heard from someone else in this range
- {ok, {Counters, Acc}};
+ {ok, {Counters, PseqAcc, Acc}};
nil ->
Seq = couch_util:get_value(update_seq, Info),
C1 = fabric_dict:store(Shard, Seq, Counters),
C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ PSeq = couch_util:get_value(purge_seq, Info),
+ NewPseqAcc = [{Shard, PSeq}|PseqAcc],
case fabric_dict:any(nil, C2) of
true ->
- {ok, {C2, [Info|Acc]}};
+ {ok, {C2, NewPseqAcc, [Info|Acc]}};
false ->
{stop, [
{db_name,Name},
+ {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)},
{update_seq, fabric_view_changes:pack_seqs(C2)} |
merge_results(lists:flatten([Info|Acc]))
]}
@@ -91,8 +98,6 @@ merge_results(Info) ->
[{doc_count, lists:sum(X)} | Acc];
(doc_del_count, X, Acc) ->
[{doc_del_count, lists:sum(X)} | Acc];
- (purge_seq, X, Acc) ->
- [{purge_seq, lists:sum(X)} | Acc];
(compact_running, X, Acc) ->
[{compact_running, lists:member(true, X)} | Acc];
(disk_size, X, Acc) -> % legacy
diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl
index 367ef06e9..26e1b3752 100644
--- a/src/fabric/src/fabric_db_meta.erl
+++ b/src/fabric/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
-module(fabric_db_meta).
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+ set_purge_infos_limit/3]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
{error, Error}.
+set_purge_infos_limit(DbName, Limit, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]),
+ Handler = fun handle_purge_message/3,
+ Acc0 = {Workers, length(Workers) - 1},
+ case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+ {ok, ok} ->
+ ok;
+ {timeout, {DefunctWorkers, _}} ->
+ fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+ {error, timeout};
+ Error ->
+ Error
+ end.
+
+handle_purge_message(ok, _, {_Workers, 0}) ->
+ {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+ {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+ {error, Error}.
+
+
set_security(DbName, SecObj, Options) ->
Shards = mem3:shards(DbName),
RexiMon = fabric_util:create_monitors(Shards),
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl
new file mode 100644
index 000000000..2571d0d7f
--- /dev/null
+++ b/src/fabric/src/fabric_doc_purge.erl
@@ -0,0 +1,572 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_purge).
+
+
+-export([
+ go/3
+]).
+
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-record(acc, {
+ worker_uuids,
+ resps,
+ uuid_counts,
+ w
+}).
+
+
+go(_, [], _) ->
+ {ok, []};
+go(DbName, IdsRevs, Options) ->
+ % Generate our purge requests of {UUID, DocId, Revs}
+ {UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
+
+ % Fire off rexi workers for each shard.
+ {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
+ #shard{name = ShardDbName, node = Node} = Shard,
+ Args = [ShardDbName, ShardReqs, Options],
+ Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
+ Worker = Shard#shard{ref=Ref},
+ ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
+ {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
+ end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
+
+ UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) ->
+ lists:foldl(fun(UUID, InnerCountAcc) ->
+ dict:update_counter(UUID, 1, InnerCountAcc)
+ end, CountAcc, WUUIDs)
+ end, dict:new(), WorkerUUIDs),
+
+ RexiMon = fabric_util:create_monitors(Workers),
+ Timeout = fabric_util:request_timeout(),
+ Acc0 = #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
+ uuid_counts = UUIDCounts,
+ w = w(DbName, Options)
+ },
+ Acc2 = try rexi_utils:recv(Workers, #shard.ref,
+ fun handle_message/3, Acc0, infinity, Timeout) of
+ {ok, Acc1} ->
+ Acc1;
+ {timeout, Acc1} ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc1,
+ DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs],
+ fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+ NewResps = append_errors(timeout, WorkerUUIDs, Resps),
+ Acc1#acc{worker_uuids = [], resps = NewResps};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end,
+
+ FinalResps = format_resps(UUIDs, Acc2),
+ {resp_health(FinalResps), FinalResps}.
+
+
+handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc,
+ Pred = fun({#shard{node = N}, _}) -> N == Node end,
+ {Failed, Rest} = lists:partition(Pred, WorkerUUIDs),
+ NewResps = append_errors(internal_server_error, Failed, Resps),
+ maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({rexi_EXIT, _}, Worker, Acc) ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc,
+ {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+ NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
+ maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({ok, Replies}, Worker, Acc) ->
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = Resps
+ } = Acc,
+ {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
+ NewResps = append_resps(UUIDs, Replies, Resps),
+ maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
+
+handle_message({bad_request, Msg}, _, _) ->
+ throw({bad_request, Msg}).
+
+
+create_reqs([], UUIDs, Reqs) ->
+ {lists:reverse(UUIDs), lists:reverse(Reqs)};
+
+create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
+ UUID = couch_uuids:new(),
+ NewUUIDs = [UUID | UUIDs],
+ NewReqs = [{UUID, Id, Revs} | Reqs],
+ create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
+
+
+group_reqs_by_shard(DbName, Reqs) ->
+ lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, Req, D1)
+ end, D0, mem3:shards(DbName, Id))
+ end, dict:new(), Reqs).
+
+
+w(DbName, Options) ->
+ try
+ list_to_integer(couch_util:get_value(w, Options))
+ catch _:_ ->
+ mem3:quorum(DbName)
+ end.
+
+
+append_errors(Type, WorkerUUIDs, Resps) ->
+ lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
+ Errors = [{error, Type} || _UUID <- UUIDs],
+ append_resps(UUIDs, Errors, RespAcc)
+ end, Resps, WorkerUUIDs).
+
+
+append_resps([], [], Resps) ->
+ Resps;
+append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
+ NewResps = dict:append(UUID, Reply, Resps),
+ append_resps(RestUUIDs, RestReplies, NewResps).
+
+
+maybe_stop(#acc{worker_uuids = []} = Acc) ->
+ {stop, Acc};
+maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
+ try
+ dict:fold(fun(UUID, UUIDResps, _) ->
+ UUIDCount = dict:fetch(UUID, Counts),
+ case has_quorum(UUIDResps, UUIDCount, W) of
+ true -> ok;
+ false -> throw(keep_going)
+ end
+ end, nil, Resps),
+ {stop, Acc}
+ catch throw:keep_going ->
+ {ok, Acc}
+ end.
+
+
+format_resps(UUIDs, #acc{} = Acc) ->
+ #acc{
+ resps = Resps,
+ w = W
+ } = Acc,
+ FoldFun = fun(UUID, Replies, ReplyAcc) ->
+ OkReplies = [Reply || {ok, Reply} <- Replies],
+ case OkReplies of
+ [] ->
+ [Error | _] = lists:usort(Replies),
+ [{UUID, Error} | ReplyAcc];
+ _ ->
+ AllRevs = lists:usort(lists:flatten(OkReplies)),
+ IsOk = length(OkReplies) >= W
+ andalso length(lists:usort(OkReplies)) == 1,
+ Health = if IsOk -> ok; true -> accepted end,
+ [{UUID, {Health, AllRevs}} | ReplyAcc]
+ end
+ end,
+ FinalReplies = dict:fold(FoldFun, {ok, []}, Resps),
+ couch_util:reorder_results(UUIDs, FinalReplies);
+
+format_resps(_UUIDs, Else) ->
+ Else.
+
+
+resp_health(Resps) ->
+ Healths = lists:usort([H || {H, _} <- Resps]),
+ HasError = lists:member(error, Healths),
+ HasAccepted = lists:member(accepted, Healths),
+ AllOk = Healths == [ok],
+ if
+ HasError -> error;
+ HasAccepted -> accepted;
+ AllOk -> ok;
+ true -> error
+ end.
+
+
+has_quorum(Resps, Count, W) ->
+ OkResps = [R || {ok, _} = R <- Resps],
+ OkCounts = lists:foldl(fun(R, Acc) ->
+ orddict:update_counter(R, 1, Acc)
+ end, orddict:new(), OkResps),
+ MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]),
+ if
+ MaxOk >= W -> true;
+ length(Resps) >= Count -> true;
+ true -> false
+ end.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+purge_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_w2_ok(),
+ t_w3_ok(),
+
+ t_w2_mixed_accepted(),
+ t_w3_mixed_accepted(),
+
+ t_w2_exit1_ok(),
+ t_w2_exit2_accepted(),
+ t_w2_exit3_error(),
+
+ t_w4_accepted(),
+
+ t_mixed_ok_accepted(),
+ t_mixed_errors()
+ ]
+ }.
+
+
+setup() ->
+ meck:new(couch_log),
+ meck:expect(couch_log, warning, fun(_, _) -> ok end),
+ meck:expect(couch_log, notice, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+ meck:unload().
+
+
+t_w2_ok() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, true),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(ok, resp_health(Resps))
+ end).
+
+
+t_w3_ok() ->
+ ?_test(begin
+ Acc0 = create_init_acc(3),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(ok, resp_health(Resps))
+ end).
+
+
+t_w2_mixed_accepted() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+ Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [
+ {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+ {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+ ],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_w3_mixed_accepted() ->
+ ?_test(begin
+ Acc0 = create_init_acc(3),
+ Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
+ Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [
+ {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
+ {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
+ ],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_w2_exit1_ok() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(ok, resp_health(Resps))
+ end).
+
+
+t_w2_exit2_accepted() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_w2_exit3_error() ->
+ ?_test(begin
+ Acc0 = create_init_acc(2),
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [
+ {error, internal_server_error},
+ {error, internal_server_error}
+ ],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(error, resp_health(Resps))
+ end).
+
+
+t_w4_accepted() ->
+ % Make sure we return when all workers have responded
+ % rather than wait around for a timeout if a user asks
+ % for a qourum with more than the available number of
+ % shards.
+ ?_test(begin
+ Acc0 = create_init_acc(4),
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ ?assertEqual(2, length(Acc1#acc.worker_uuids)),
+ check_quorum(Acc1, false),
+
+ {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ ?assertEqual(1, length(Acc2#acc.worker_uuids)),
+ check_quorum(Acc2, false),
+
+ {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
+ ?assertEqual(0, length(Acc3#acc.worker_uuids)),
+ check_quorum(Acc3, true),
+
+ Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_mixed_ok_accepted() ->
+ ?_test(begin
+ WorkerUUIDs = [
+ {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+ {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+ ],
+
+ Acc0 = #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+ uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+ w = 2
+ },
+
+ Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]},
+ Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
+ {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
+ {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+ {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+ {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(accepted, resp_health(Resps))
+ end).
+
+
+t_mixed_errors() ->
+ ?_test(begin
+ WorkerUUIDs = [
+ {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
+ {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
+
+ {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
+ {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
+ ],
+
+ Acc0 = #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
+ uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
+ w = 2
+ },
+
+ Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
+ ExitMsg = {rexi_EXIT, blargh},
+
+ {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
+ {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
+ {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
+ {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
+ {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
+
+ Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
+ Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
+ ?assertEqual(Expect, Resps),
+ ?assertEqual(error, resp_health(Resps))
+ end).
+
+
+create_init_acc(W) ->
+ UUID1 = <<"uuid1">>,
+ UUID2 = <<"uuid2">>,
+
+ Nodes = [node1, node2, node3],
+ Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes),
+
+ % Create our worker_uuids. We're relying on the fact that
+ % we're using a fake Q=1 db so we don't have to worry
+ % about any hashing here.
+ WorkerUUIDs = lists:map(fun(Shard) ->
+ {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]}
+ end, Shards),
+
+ #acc{
+ worker_uuids = WorkerUUIDs,
+ resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
+ uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
+ w = W
+ }.
+
+
+worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
+ {Worker, _} = lists:nth(N, WorkerUUIDs),
+ Worker.
+
+
+check_quorum(Acc, Expect) ->
+ dict:fold(fun(_Shard, Resps, _) ->
+ ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
+ end, nil, Acc#acc.resps).
+
+-endif.