diff options
Diffstat (limited to 'src/fabric/src/fabric_doc_update.erl')
-rw-r--r-- | src/fabric/src/fabric_doc_update.erl | 408 |
1 files changed, 229 insertions, 179 deletions
diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 69babc14b..62e180ae2 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -25,48 +25,53 @@ go(DbName, AllDocs0, Opts) -> AllDocs = tag_docs(AllDocs1), validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), - GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> - Docs1 = untag_docs(Docs), - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}), - {Shard#shard{ref=Ref}, Docs} - end, group_docs_by_shard(DbName, AllDocs)), + GroupedDocs = lists:map( + fun({#shard{name = Name, node = Node} = Shard, Docs}) -> + Docs1 = untag_docs(Docs), + Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}), + {Shard#shard{ref = Ref}, Docs} + end, + group_docs_by_shard(DbName, AllDocs) + ), {Workers, _} = lists:unzip(GroupedDocs), RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), - Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, - dict:new()}, + Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, dict:new()}, Timeout = fabric_util:request_timeout(), try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of - {ok, {Health, Results}} - when Health =:= ok; Health =:= accepted; Health =:= error -> - {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; - {timeout, Acc} -> - {_, _, W1, GroupedDocs1, DocReplDict} = Acc, - {DefunctWorkers, _} = lists:unzip(GroupedDocs1), - fabric_util:log_timeout(DefunctWorkers, "update_docs"), - {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, - DocReplDict), - {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; - Else -> - Else + {ok, {Health, Results}} when + Health =:= ok; Health =:= accepted; Health =:= error + -> + {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; + {timeout, Acc} -> + {_, _, W1, GroupedDocs1, DocReplDict} = Acc, + {DefunctWorkers, _} = lists:unzip(GroupedDocs1), + fabric_util:log_timeout(DefunctWorkers, "update_docs"), + {Health, _, Resp} = dict:fold( + fun force_reply/3, + {ok, W1, []}, + DocReplDict + ), + {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; + Else -> + Else after rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) -> +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, Acc0) -> {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0, - NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef], + NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict}); - handle_message({rexi_EXIT, _}, Worker, Acc0) -> - {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, - NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), - skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); + {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, + NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), + skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); handle_message(internal_server_error, Worker, Acc0) -> % happens when we fail to load validation functions in an RPC worker - {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, - NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), - skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); + {WC, LenDocs, W, GrpDocs, DocReplyDict} = Acc0, + NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), + skip_message({WC - 1, LenDocs, W, NewGrpDocs, DocReplyDict}); handle_message(attachment_chunk_received, _Worker, Acc0) -> {ok, Acc0}; handle_message({ok, Replies}, Worker, Acc0) -> @@ -74,21 +79,24 @@ handle_message({ok, Replies}, Worker, Acc0) -> {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, - DocReplyDict), - {stop, {Health, Reply}}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of - continue -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; - {stop, W, FinalReplies} -> - {stop, {ok, FinalReplies}} - end; - _ -> - {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}} + {1, _} -> + % last message has arrived, we need to conclude things + {Health, W, Reply} = dict:fold( + fun force_reply/3, + {ok, W, []}, + DocReplyDict + ), + {stop, {Health, Reply}}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of + continue -> + {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; + {stop, W, FinalReplies} -> + {stop, {ok, FinalReplies}} + end; + _ -> + {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}} end; handle_message({missing_stub, Stub}, _, _) -> throw({missing_stub, Stub}); @@ -106,53 +114,61 @@ before_doc_update(DbName, Docs, Opts) -> {true, _} -> %% cluster db is expensive to create so we only do it if we have to Db = fabric_util:open_cluster_db(DbName, Opts), - [couch_replicator_docs:before_doc_update(Doc, Db, replicated_changes) - || Doc <- Docs]; + [ + couch_replicator_docs:before_doc_update(Doc, Db, replicated_changes) + || Doc <- Docs + ]; {_, true} -> %% cluster db is expensive to create so we only do it if we have to Db = fabric_util:open_cluster_db(DbName, Opts), - [couch_users_db:before_doc_update(Doc, Db, interactive_edit) - || Doc <- Docs]; + [ + couch_users_db:before_doc_update(Doc, Db, interactive_edit) + || Doc <- Docs + ]; _ -> Docs end. tag_docs([]) -> []; -tag_docs([#doc{meta=Meta}=Doc | Rest]) -> - [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)]. +tag_docs([#doc{meta = Meta} = Doc | Rest]) -> + [Doc#doc{meta = [{ref, make_ref()} | Meta]} | tag_docs(Rest)]. untag_docs([]) -> []; -untag_docs([#doc{meta=Meta}=Doc | Rest]) -> - [Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)]. +untag_docs([#doc{meta = Meta} = Doc | Rest]) -> + [Doc#doc{meta = lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)]. force_reply(Doc, [], {_, W, Acc}) -> {error, W, [{Doc, {error, internal_server_error}} | Acc]}; -force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) -> +force_reply(Doc, [FirstReply | _] = Replies, {Health, W, Acc}) -> case update_quorum_met(W, Replies) of - {true, Reply} -> - {Health, W, [{Doc,Reply} | Acc]}; - false -> - case [Reply || {ok, Reply} <- Replies] of - [] -> - % check if all errors are identical, if so inherit health - case lists:all(fun(E) -> E =:= FirstReply end, Replies) of - true -> - CounterKey = [fabric, doc_update, errors], - couch_stats:increment_counter(CounterKey), - {Health, W, [{Doc, FirstReply} | Acc]}; - false -> - CounterKey = [fabric, doc_update, mismatched_errors], - couch_stats:increment_counter(CounterKey), - {error, W, [{Doc, FirstReply} | Acc]} - end; - [AcceptedRev | _] -> - CounterKey = [fabric, doc_update, write_quorum_errors], - couch_stats:increment_counter(CounterKey), - NewHealth = case Health of ok -> accepted; _ -> Health end, - {NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]} - end + {true, Reply} -> + {Health, W, [{Doc, Reply} | Acc]}; + false -> + case [Reply || {ok, Reply} <- Replies] of + [] -> + % check if all errors are identical, if so inherit health + case lists:all(fun(E) -> E =:= FirstReply end, Replies) of + true -> + CounterKey = [fabric, doc_update, errors], + couch_stats:increment_counter(CounterKey), + {Health, W, [{Doc, FirstReply} | Acc]}; + false -> + CounterKey = [fabric, doc_update, mismatched_errors], + couch_stats:increment_counter(CounterKey), + {error, W, [{Doc, FirstReply} | Acc]} + end; + [AcceptedRev | _] -> + CounterKey = [fabric, doc_update, write_quorum_errors], + couch_stats:increment_counter(CounterKey), + NewHealth = + case Health of + ok -> accepted; + _ -> Health + end, + {NewHealth, W, [{Doc, {accepted, AcceptedRev}} | Acc]} + end end. maybe_reply(_, _, continue) -> @@ -160,21 +176,24 @@ maybe_reply(_, _, continue) -> continue; maybe_reply(Doc, Replies, {stop, W, Acc}) -> case update_quorum_met(W, Replies) of - {true, Reply} -> - {stop, W, [{Doc, Reply} | Acc]}; - false -> - continue + {true, Reply} -> + {stop, W, [{Doc, Reply} | Acc]}; + false -> + continue end. update_quorum_met(W, Replies) -> - Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, - orddict:new(), Replies), + Counters = lists:foldl( + fun(R, D) -> orddict:update_counter(R, 1, D) end, + orddict:new(), + Replies + ), GoodReplies = lists:filter(fun good_reply/1, Counters), case lists:dropwhile(fun({_, Count}) -> Count < W end, GoodReplies) of - [] -> - false; - [{FinalReply, _} | _] -> - {true, FinalReply} + [] -> + false; + [{FinalReply, _} | _] -> + {true, FinalReply} end. good_reply({{ok, _}, _}) -> @@ -186,18 +205,28 @@ good_reply(_) -> -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. group_docs_by_shard(DbName, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), Docs)). + dict:to_list( + lists:foldl( + fun(#doc{id = Id} = Doc, D0) -> + lists:foldl( + fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, + D0, + mem3:shards(DbName, Id) + ) + end, + dict:new(), + Docs + ) + ). append_update_replies([], [], DocReplyDict) -> DocReplyDict; -append_update_replies([Doc|Rest], [], Dict0) -> +append_update_replies([Doc | Rest], [], Dict0) -> % icky, if replicated_changes only errors show up in result append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> +append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). skip_message({0, _, W, _, DocReplyDict}) -> @@ -213,27 +242,29 @@ validate_atomic_update(_DbName, AllDocs, true) -> % to basically extract the prep_and_validate_updates function from couch_db % and only run that, without actually writing in case of a success. Error = {not_implemented, <<"all_or_nothing is not supported">>}, - PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> - case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, - {{Id, {Pos, RevId}}, Error} - end, AllDocs), + PreCommitFailures = lists:map( + fun(#doc{id = Id, revs = {Pos, Revs}}) -> + case Revs of + [] -> RevId = <<>>; + [RevId | _] -> ok + end, + {{Id, {Pos, RevId}}, Error} + end, + AllDocs + ), throw({aborted, PreCommitFailures}). - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). - setup_all() -> meck:new([couch_log, couch_stats]), - meck:expect(couch_log, warning, fun(_,_) -> ok end), + meck:expect(couch_log, warning, fun(_, _) -> ok end), meck:expect(couch_stats, increment_counter, fun(_) -> ok end). - teardown_all(_) -> meck:unload(). - doc_update_test_() -> { setup, @@ -246,132 +277,151 @@ doc_update_test_() -> ] }. - % eunits doc_update1() -> - Doc1 = #doc{revs = {1,[<<"foo">>]}}, - Doc2 = #doc{revs = {1,[<<"bar">>]}}, + Doc1 = #doc{revs = {1, [<<"foo">>]}}, + Doc2 = #doc{revs = {1, [<<"bar">>]}}, Docs = [Doc1], Docs2 = [Doc2, Doc1], - Dict = dict:from_list([{Doc,[]} || Doc <- Docs]), - Dict2 = dict:from_list([{Doc,[]} || Doc <- Docs2]), + Dict = dict:from_list([{Doc, []} || Doc <- Docs]), + Dict2 = dict:from_list([{Doc, []} || Doc <- Docs2]), Shards = - mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), - + mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), % test for W = 2 - AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, - Dict}, + AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, Dict}, - {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} = - handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2), - ?assertEqual(WaitingCountW2_1,2), - {stop, FinalReplyW2 } = - handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1), - ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2), + {ok, {WaitingCountW2_1, _, _, _, _} = AccW2_1} = + handle_message({ok, [{ok, Doc1}]}, hd(Shards), AccW2), + ?assertEqual(WaitingCountW2_1, 2), + {stop, FinalReplyW2} = + handle_message({ok, [{ok, Doc1}]}, lists:nth(2, Shards), AccW2_1), + ?assertEqual({ok, [{Doc1, {ok, Doc1}}]}, FinalReplyW2), % test for W = 3 - AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, - Dict}, + AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, Dict}, - {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} = - handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3), - ?assertEqual(WaitingCountW3_1,2), + {ok, {WaitingCountW3_1, _, _, _, _} = AccW3_1} = + handle_message({ok, [{ok, Doc1}]}, hd(Shards), AccW3), + ?assertEqual(WaitingCountW3_1, 2), - {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} = - handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1), - ?assertEqual(WaitingCountW3_2,1), + {ok, {WaitingCountW3_2, _, _, _, _} = AccW3_2} = + handle_message({ok, [{ok, Doc1}]}, lists:nth(2, Shards), AccW3_1), + ?assertEqual(WaitingCountW3_2, 1), - {stop, FinalReplyW3 } = - handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2), - ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3), + {stop, FinalReplyW3} = + handle_message({ok, [{ok, Doc1}]}, lists:nth(3, Shards), AccW3_2), + ?assertEqual({ok, [{Doc1, {ok, Doc1}}]}, FinalReplyW3), % test w quorum > # shards, which should fail immediately - Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]), - GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs), + Shards2 = mem3_util:create_partition_map("foo", 1, 1, ["node1"]), + GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>, Shards2, Docs), AccW4 = {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict}, Bool = - case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of - {stop, _Reply} -> - true; - _ -> false - end, - ?assertEqual(Bool,true), + case handle_message({ok, [{ok, Doc1}]}, hd(Shards2), AccW4) of + {stop, _Reply} -> + true; + _ -> + false + end, + ?assertEqual(Bool, true), % Docs with no replies should end up as {error, internal_server_error} - SA1 = #shard{node=a, range=1}, - SB1 = #shard{node=b, range=1}, - SA2 = #shard{node=a, range=2}, - SB2 = #shard{node=b, range=2}, - GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}], + SA1 = #shard{node = a, range = 1}, + SB1 = #shard{node = b, range = 1}, + SA2 = #shard{node = a, range = 2}, + SB2 = #shard{node = b, range = 2}, + GroupedDocs3 = [{SA1, [Doc1]}, {SB1, [Doc1]}, {SA2, [Doc2]}, {SB2, [Doc2]}], StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2}, {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0), {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1), {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2), {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3), ?assertEqual( - {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]}, + {error, [{Doc1, {accepted, "A"}}, {Doc2, {error, internal_server_error}}]}, ReplyW5 ). doc_update2() -> - Doc1 = #doc{revs = {1,[<<"foo">>]}}, - Doc2 = #doc{revs = {1,[<<"bar">>]}}, + Doc1 = #doc{revs = {1, [<<"foo">>]}}, + Doc2 = #doc{revs = {1, [<<"bar">>]}}, Docs = [Doc2, Doc1], Shards = - mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), - Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- Docs])}, - - {ok,{WaitingCount1,_,_,_,_}=Acc1} = - handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), - ?assertEqual(WaitingCount1,2), - - {ok,{WaitingCount2,_,_,_,_}=Acc2} = - handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), - ?assertEqual(WaitingCount2,1), + mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), + Acc0 = { + length(Shards), + length(Docs), + list_to_integer("2"), + GroupedDocs, + dict:from_list([{Doc, []} || Doc <- Docs]) + }, + + {ok, {WaitingCount1, _, _, _, _} = Acc1} = + handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), + ?assertEqual(WaitingCount1, 2), + + {ok, {WaitingCount2, _, _, _, _} = Acc2} = + handle_message({rexi_EXIT, 1}, lists:nth(2, Shards), Acc1), + ?assertEqual(WaitingCount2, 1), {stop, Reply} = - handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2), + handle_message({rexi_EXIT, 1}, lists:nth(3, Shards), Acc2), - ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]}, - Reply). + ?assertEqual( + {accepted, [{Doc1, {accepted, Doc2}}, {Doc2, {accepted, Doc1}}]}, + Reply + ). doc_update3() -> - Doc1 = #doc{revs = {1,[<<"foo">>]}}, - Doc2 = #doc{revs = {1,[<<"bar">>]}}, + Doc1 = #doc{revs = {1, [<<"foo">>]}}, + Doc2 = #doc{revs = {1, [<<"bar">>]}}, Docs = [Doc2, Doc1], Shards = - mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), - GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), - Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- Docs])}, - - {ok,{WaitingCount1,_,_,_,_}=Acc1} = - handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), - ?assertEqual(WaitingCount1,2), - - {ok,{WaitingCount2,_,_,_,_}=Acc2} = - handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), - ?assertEqual(WaitingCount2,1), + mem3_util:create_partition_map("foo", 3, 1, ["node1", "node2", "node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>, Shards, Docs), + Acc0 = { + length(Shards), + length(Docs), + list_to_integer("2"), + GroupedDocs, + dict:from_list([{Doc, []} || Doc <- Docs]) + }, + + {ok, {WaitingCount1, _, _, _, _} = Acc1} = + handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, hd(Shards), Acc0), + ?assertEqual(WaitingCount1, 2), + + {ok, {WaitingCount2, _, _, _, _} = Acc2} = + handle_message({rexi_EXIT, 1}, lists:nth(2, Shards), Acc1), + ?assertEqual(WaitingCount2, 1), {stop, Reply} = - handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2), + handle_message({ok, [{ok, Doc1}, {ok, Doc2}]}, lists:nth(3, Shards), Acc2), - ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply). + ?assertEqual({ok, [{Doc1, {ok, Doc2}}, {Doc2, {ok, Doc1}}]}, Reply). % needed for testing to avoid having to start the mem3 application group_docs_by_shard_hack(_DbName, Shards, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=_Id} = Doc, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, D0, Shards) - end, dict:new(), Docs)). + dict:to_list( + lists:foldl( + fun(#doc{id = _Id} = Doc, D0) -> + lists:foldl( + fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, + D0, + Shards + ) + end, + dict:new(), + Docs + ) + ). -endif. |