diff options
Diffstat (limited to 'src/fabric/src/fabric_doc_purge.erl')
-rw-r--r-- | src/fabric/src/fabric_doc_purge.erl | 571 |
1 files changed, 0 insertions, 571 deletions
diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl deleted file mode 100644 index 3492f88c5..000000000 --- a/src/fabric/src/fabric_doc_purge.erl +++ /dev/null @@ -1,571 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_purge). - - --export([ - go/3 -]). - - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - - --record(acc, { - worker_uuids, - resps, - uuid_counts, - w -}). - - -go(_, [], _) -> - {ok, []}; -go(DbName, IdsRevs, Options) -> - % Generate our purge requests of {UUID, DocId, Revs} - {UUIDs, Reqs} = create_reqs(IdsRevs, [], []), - - % Fire off rexi workers for each shard. - {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) -> - #shard{name = ShardDbName, node = Node} = Shard, - Args = [ShardDbName, ShardReqs, Options], - Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}), - Worker = Shard#shard{ref=Ref}, - ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs], - {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]} - end, {[], []}, group_reqs_by_shard(DbName, Reqs)), - - UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) -> - lists:foldl(fun(UUID, InnerCountAcc) -> - dict:update_counter(UUID, 1, InnerCountAcc) - end, CountAcc, WUUIDs) - end, dict:new(), WorkerUUIDs), - - RexiMon = fabric_util:create_monitors(Workers), - Timeout = fabric_util:request_timeout(), - Acc0 = #acc{ - worker_uuids = WorkerUUIDs, - resps = dict:from_list([{UUID, []} || UUID <- UUIDs]), - uuid_counts = UUIDCounts, - w = w(DbName, Options) - }, - Acc2 = try rexi_utils:recv(Workers, #shard.ref, - fun handle_message/3, Acc0, infinity, Timeout) of - {ok, Acc1} -> - Acc1; - {timeout, Acc1} -> - #acc{ - worker_uuids = WorkerUUIDs, - resps = Resps - } = Acc1, - DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs], - fabric_util:log_timeout(DefunctWorkers, "purge_docs"), - NewResps = append_errors(timeout, WorkerUUIDs, Resps), - Acc1#acc{worker_uuids = [], resps = NewResps}; - Else -> - Else - after - rexi_monitor:stop(RexiMon) - end, - - FinalResps = format_resps(UUIDs, Acc2), - {resp_health(FinalResps), FinalResps}. - - -handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> - #acc{ - worker_uuids = WorkerUUIDs, - resps = Resps - } = Acc, - Pred = fun({#shard{node = N}, _}) -> N == Node end, - {Failed, Rest} = lists:partition(Pred, WorkerUUIDs), - NewResps = append_errors(internal_server_error, Failed, Resps), - maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); - -handle_message({rexi_EXIT, _}, Worker, Acc) -> - #acc{ - worker_uuids = WorkerUUIDs, - resps = Resps - } = Acc, - {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), - NewResps = append_errors(internal_server_error, [WorkerPair], Resps), - maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); - -handle_message({ok, Replies}, Worker, Acc) -> - #acc{ - worker_uuids = WorkerUUIDs, - resps = Resps - } = Acc, - {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), - NewResps = append_resps(UUIDs, Replies, Resps), - maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); - -handle_message({bad_request, Msg}, _, _) -> - throw({bad_request, Msg}). - - -create_reqs([], UUIDs, Reqs) -> - {lists:reverse(UUIDs), lists:reverse(Reqs)}; - -create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) -> - UUID = couch_uuids:new(), - NewUUIDs = [UUID | UUIDs], - NewReqs = [{UUID, Id, Revs} | Reqs], - create_reqs(RestIdsRevs, NewUUIDs, NewReqs). - - -group_reqs_by_shard(DbName, Reqs) -> - lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, Req, D1) - end, D0, mem3:shards(DbName, Id)) - end, dict:new(), Reqs). - - -w(DbName, Options) -> - try - list_to_integer(couch_util:get_value(w, Options)) - catch _:_ -> - mem3:quorum(DbName) - end. - - -append_errors(Type, WorkerUUIDs, Resps) -> - lists:foldl(fun({_Worker, UUIDs}, RespAcc) -> - Errors = [{error, Type} || _UUID <- UUIDs], - append_resps(UUIDs, Errors, RespAcc) - end, Resps, WorkerUUIDs). - - -append_resps([], [], Resps) -> - Resps; -append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) -> - NewResps = dict:append(UUID, Reply, Resps), - append_resps(RestUUIDs, RestReplies, NewResps). - - -maybe_stop(#acc{worker_uuids = []} = Acc) -> - {stop, Acc}; -maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) -> - try - dict:fold(fun(UUID, UUIDResps, _) -> - UUIDCount = dict:fetch(UUID, Counts), - case has_quorum(UUIDResps, UUIDCount, W) of - true -> ok; - false -> throw(keep_going) - end - end, nil, Resps), - {stop, Acc} - catch throw:keep_going -> - {ok, Acc} - end. - - -format_resps(UUIDs, #acc{} = Acc) -> - #acc{ - resps = Resps, - w = W - } = Acc, - FoldFun = fun(UUID, Replies, ReplyAcc) -> - OkReplies = [Reply || {ok, Reply} <- Replies], - case OkReplies of - [] -> - [Error | _] = lists:usort(Replies), - [{UUID, Error} | ReplyAcc]; - _ -> - AllRevs = lists:usort(lists:flatten(OkReplies)), - IsOk = length(OkReplies) >= W - andalso length(lists:usort(OkReplies)) == 1, - Health = if IsOk -> ok; true -> accepted end, - [{UUID, {Health, AllRevs}} | ReplyAcc] - end - end, - FinalReplies = dict:fold(FoldFun, [], Resps), - couch_util:reorder_results(UUIDs, FinalReplies); - -format_resps(_UUIDs, Else) -> - Else. - - -resp_health(Resps) -> - Healths = lists:usort([H || {H, _} <- Resps]), - HasError = lists:member(error, Healths), - HasAccepted = lists:member(accepted, Healths), - AllOk = Healths == [ok], - if - HasError -> error; - HasAccepted -> accepted; - AllOk -> ok; - true -> error - end. - - -has_quorum(Resps, Count, W) -> - OkResps = [R || {ok, _} = R <- Resps], - OkCounts = lists:foldl(fun(R, Acc) -> - orddict:update_counter(R, 1, Acc) - end, orddict:new(), OkResps), - MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]), - if - MaxOk >= W -> true; - length(Resps) >= Count -> true; - true -> false - end. - - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -purge_test_() -> - { - setup, - fun setup/0, - fun teardown/1, - [ - t_w2_ok(), - t_w3_ok(), - - t_w2_mixed_accepted(), - t_w3_mixed_accepted(), - - t_w2_exit1_ok(), - t_w2_exit2_accepted(), - t_w2_exit3_error(), - - t_w4_accepted(), - - t_mixed_ok_accepted(), - t_mixed_errors() - ] - }. - - -setup() -> - meck:new(couch_log), - meck:expect(couch_log, warning, fun(_, _) -> ok end), - meck:expect(couch_log, notice, fun(_, _) -> ok end). - - -teardown(_) -> - meck:unload(). - - -t_w2_ok() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, true), - - Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), - ?assertEqual(Expect, Resps), - ?assertEqual(ok, resp_health(Resps)) - end). - - -t_w3_ok() -> - ?_test(begin - Acc0 = create_init_acc(3), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(ok, resp_health(Resps)) - end). - - -t_w2_mixed_accepted() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, - Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, - - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [ - {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, - {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} - ], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - - -t_w3_mixed_accepted() -> - ?_test(begin - Acc0 = create_init_acc(3), - Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, - Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, - - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [ - {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, - {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} - ], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - - -t_w2_exit1_ok() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(ok, resp_health(Resps)) - end). - - -t_w2_exit2_accepted() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - - -t_w2_exit3_error() -> - ?_test(begin - Acc0 = create_init_acc(2), - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [ - {error, internal_server_error}, - {error, internal_server_error} - ], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(error, resp_health(Resps)) - end). - - -t_w4_accepted() -> - % Make sure we return when all workers have responded - % rather than wait around for a timeout if a user asks - % for a qourum with more than the available number of - % shards. - ?_test(begin - Acc0 = create_init_acc(4), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - - -t_mixed_ok_accepted() -> - ?_test(begin - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} - ], - - Acc0 = #acc{ - worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), - w = 2 - }, - - Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]}, - Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), - {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), - {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), - - Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - - -t_mixed_errors() -> - ?_test(begin - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} - ], - - Acc0 = #acc{ - worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), - w = 2 - }, - - Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), - {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), - {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), - - Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), - ?assertEqual(Expect, Resps), - ?assertEqual(error, resp_health(Resps)) - end). - - -create_init_acc(W) -> - UUID1 = <<"uuid1">>, - UUID2 = <<"uuid2">>, - - Nodes = [node1, node2, node3], - Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes), - - % Create our worker_uuids. We're relying on the fact that - % we're using a fake Q=1 db so we don't have to worry - % about any hashing here. - WorkerUUIDs = lists:map(fun(Shard) -> - {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]} - end, Shards), - - #acc{ - worker_uuids = WorkerUUIDs, - resps = dict:from_list([{UUID1, []}, {UUID2, []}]), - uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]), - w = W - }. - - -worker(N, #acc{worker_uuids = WorkerUUIDs}) -> - {Worker, _} = lists:nth(N, WorkerUUIDs), - Worker. - - -check_quorum(Acc, Expect) -> - dict:fold(fun(_Shard, Resps, _) -> - ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w)) - end, nil, Acc#acc.resps). - --endif. |