summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_doc_open.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_doc_open.erl')
-rw-r--r--src/fabric/src/fabric_doc_open.erl611
1 files changed, 0 insertions, 611 deletions
diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
deleted file mode 100644
index ba348112c..000000000
--- a/src/fabric/src/fabric_doc_open.erl
+++ /dev/null
@@ -1,611 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_doc_open).
-
--export([go/3]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record(acc, {
- dbname,
- workers,
- r,
- state,
- replies,
- node_revs = [],
- q_reply
-}).
-
-go(DbName, Id, Options) ->
- Handler =
- case proplists:get_value(doc_info, Options) of
- true -> get_doc_info;
- full -> get_full_doc_info;
- undefined -> open_doc
- end,
- Workers = fabric_util:submit_jobs(
- mem3:shards(DbName, Id),
- Handler,
- [Id, [deleted | Options]]
- ),
- SuppressDeletedDoc = not lists:member(deleted, Options),
- N = mem3:n(DbName),
- R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))),
- Acc0 = #acc{
- dbname = DbName,
- workers = Workers,
- r = erlang:min(N, list_to_integer(R)),
- state = r_not_met,
- replies = []
- },
- RexiMon = fabric_util:create_monitors(Workers),
- try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, #acc{} = Acc} when Handler =:= open_doc ->
- Reply = handle_response(Acc),
- format_reply(Reply, SuppressDeletedDoc);
- {ok, #acc{state = r_not_met}} ->
- {error, quorum_not_met};
- {ok, #acc{q_reply = QuorumReply}} ->
- format_reply(QuorumReply, SuppressDeletedDoc);
- {timeout, #acc{workers = DefunctWorkers}} ->
- fabric_util:log_timeout(DefunctWorkers, atom_to_list(Handler)),
- {error, timeout};
- Error ->
- Error
- after
- rexi_monitor:stop(RexiMon)
- end.
-
-handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
- NewWorkers = [W || #shard{node = N} = W <- Acc#acc.workers, N /= Node],
- case NewWorkers of
- [] ->
- {stop, Acc#acc{workers = []}};
- _ ->
- {ok, Acc#acc{workers = NewWorkers}}
- end;
-handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
- NewWorkers = lists:delete(Worker, Acc#acc.workers),
- case NewWorkers of
- [] ->
- {stop, Acc#acc{workers = []}};
- _ ->
- {ok, Acc#acc{workers = NewWorkers}}
- end;
-handle_message(Reply, Worker, Acc) ->
- NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
- NewNodeRevs =
- case Reply of
- {ok, #doc{revs = {Pos, [Rev | _]}}} ->
- [{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs];
- _ ->
- Acc#acc.node_revs
- end,
- NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs},
- case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
- {true, QuorumReply} ->
- fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
- {stop, NewAcc#acc{workers = [], state = r_met, q_reply = QuorumReply}};
- wait_for_more ->
- NewWorkers = lists:delete(Worker, Acc#acc.workers),
- {ok, NewAcc#acc{workers = NewWorkers}};
- no_more_workers ->
- {stop, NewAcc#acc{workers = []}}
- end.
-
-handle_response(#acc{state = r_met, replies = Replies, q_reply = QuorumReply} = Acc) ->
- case {Replies, fabric_util:remove_ancestors(Replies, [])} of
- {[_], [_]} ->
- % Complete agreement amongst all copies
- QuorumReply;
- {[_ | _], [{_, {QuorumReply, _}}]} ->
- % Any divergent replies are ancestors of the QuorumReply,
- % repair the document asynchronously
- spawn(fun() -> read_repair(Acc) end),
- QuorumReply;
- _Else ->
- % real disagreement amongst the workers, block for the repair
- read_repair(Acc)
- end;
-handle_response(Acc) ->
- read_repair(Acc).
-
-is_r_met(Workers, Replies, R) ->
- case lists:dropwhile(fun({_, {_, Count}}) -> Count < R end, Replies) of
- [{_, {QuorumReply, _}} | _] ->
- {true, QuorumReply};
- [] when length(Workers) > 1 ->
- wait_for_more;
- [] ->
- no_more_workers
- end.
-
-read_repair(#acc{dbname = DbName, replies = Replies, node_revs = NodeRevs}) ->
- Docs = [Doc || {_, {{ok, #doc{} = Doc}, _}} <- Replies],
- case Docs of
- % omit local docs from read repair
- [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
- choose_reply(Docs);
- [#doc{id = Id} | _] ->
- Opts = [?ADMIN_CTX, replicated_changes, {read_repair, NodeRevs}],
- Res = fabric:update_docs(DbName, Docs, Opts),
- case Res of
- {ok, []} ->
- couch_stats:increment_counter([fabric, read_repairs, success]);
- _ ->
- couch_stats:increment_counter([fabric, read_repairs, failure]),
- couch_log:notice("read_repair ~s ~s ~p", [DbName, Id, Res])
- end,
- choose_reply(Docs);
- [] ->
- % Try hard to return some sort of information
- % to the client.
- Values = [V || {_, {V, _}} <- Replies],
- case lists:member({not_found, missing}, Values) of
- true ->
- {not_found, missing};
- false when length(Values) > 0 ->
- % Sort for stability in responses in
- % case we have some weird condition
- hd(lists:sort(Values));
- false ->
- {error, read_failure}
- end
- end.
-
-choose_reply(Docs) ->
- % Sort descending by {not deleted, rev}. This should match
- % the logic of couch_doc:to_doc_info/1.
- [Winner | _] = lists:sort(
- fun(DocA, DocB) ->
- InfoA = {not DocA#doc.deleted, DocA#doc.revs},
- InfoB = {not DocB#doc.deleted, DocB#doc.revs},
- InfoA > InfoB
- end,
- Docs
- ),
- {ok, Winner}.
-
-format_reply({ok, #full_doc_info{deleted = true}}, true) ->
- {not_found, deleted};
-format_reply({ok, #doc{deleted = true}}, true) ->
- {not_found, deleted};
-format_reply(not_found, _) ->
- {not_found, missing};
-format_reply(Else, _) ->
- Else.
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
--define(MECK_MODS, [
- couch_log,
- couch_stats,
- fabric,
- fabric_util,
- mem3,
- rexi,
- rexi_monitor
-]).
-
-setup_all() ->
- meck:new(?MECK_MODS, [passthrough]).
-
-teardown_all(_) ->
- meck:unload().
-
-setup() ->
- meck:reset(?MECK_MODS).
-
-teardown(_) ->
- ok.
-
-open_doc_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_is_r_met(),
- t_handle_message_down(),
- t_handle_message_exit(),
- t_handle_message_reply(),
- t_store_node_revs(),
- t_read_repair(),
- t_handle_response_quorum_met(),
- t_get_doc_info()
- ]
- }
- }.
-
-t_is_r_met() ->
- ?_test(begin
- Workers0 = [],
- Workers1 = [nil],
- Workers2 = [nil, nil],
-
- SuccessCases = [
- {{true, foo}, [fabric_util:kv(foo, 2)], 2},
- {{true, foo}, [fabric_util:kv(foo, 3)], 2},
- {{true, foo}, [fabric_util:kv(foo, 1)], 1},
- {{true, foo}, [fabric_util:kv(foo, 2), fabric_util:kv(bar, 1)], 2},
- {{true, bar}, [fabric_util:kv(bar, 1), fabric_util:kv(bar, 2)], 2},
- {{true, bar}, [fabric_util:kv(bar, 2), fabric_util:kv(foo, 1)], 2}
- ],
- lists:foreach(
- fun({Expect, Replies, Q}) ->
- ?assertEqual(Expect, is_r_met(Workers0, Replies, Q))
- end,
- SuccessCases
- ),
-
- WaitForMoreCases = [
- {[fabric_util:kv(foo, 1)], 2},
- {[fabric_util:kv(foo, 2)], 3},
- {[fabric_util:kv(foo, 1), fabric_util:kv(bar, 1)], 2}
- ],
- lists:foreach(
- fun({Replies, Q}) ->
- ?assertEqual(wait_for_more, is_r_met(Workers2, Replies, Q))
- end,
- WaitForMoreCases
- ),
-
- FailureCases = [
- {Workers0, [fabric_util:kv(foo, 1)], 2},
- {Workers1, [fabric_util:kv(foo, 1)], 2},
- {Workers1, [fabric_util:kv(foo, 1), fabric_util:kv(bar, 1)], 2},
- {Workers1, [fabric_util:kv(foo, 2)], 3}
- ],
- lists:foreach(
- fun({Workers, Replies, Q}) ->
- ?assertEqual(no_more_workers, is_r_met(Workers, Replies, Q))
- end,
- FailureCases
- )
- end).
-
-t_handle_message_down() ->
- Node0 = 'foo@localhost',
- Node1 = 'bar@localhost',
- Down0 = {rexi_DOWN, nil, {nil, Node0}, nil},
- Down1 = {rexi_DOWN, nil, {nil, Node1}, nil},
- Workers0 = [#shard{node = Node0} || _ <- [a, b]],
- Worker1 = #shard{node = Node1},
- Workers1 = Workers0 ++ [Worker1],
-
- ?_test(begin
- % Stop when no more workers are left
- ?assertEqual(
- {stop, #acc{workers = []}},
- handle_message(Down0, nil, #acc{workers = Workers0})
- ),
-
- % Continue when we have more workers
- ?assertEqual(
- {ok, #acc{workers = [Worker1]}},
- handle_message(Down0, nil, #acc{workers = Workers1})
- ),
-
- % A second DOWN removes the remaining workers
- ?assertEqual(
- {stop, #acc{workers = []}},
- handle_message(Down1, nil, #acc{workers = [Worker1]})
- )
- end).
-
-t_handle_message_exit() ->
- Exit = {rexi_EXIT, nil},
- Worker0 = #shard{ref = erlang:make_ref()},
- Worker1 = #shard{ref = erlang:make_ref()},
-
- ?_test(begin
- % Only removes the specified worker
- ?assertEqual(
- {ok, #acc{workers = [Worker1]}},
- handle_message(Exit, Worker0, #acc{workers = [Worker0, Worker1]})
- ),
-
- ?assertEqual(
- {ok, #acc{workers = [Worker0]}},
- handle_message(Exit, Worker1, #acc{workers = [Worker0, Worker1]})
- ),
-
- % We bail if it was the last worker
- ?assertEqual(
- {stop, #acc{workers = []}},
- handle_message(Exit, Worker0, #acc{workers = [Worker0]})
- )
- end).
-
-t_handle_message_reply() ->
- Worker0 = #shard{ref = erlang:make_ref()},
- Worker1 = #shard{ref = erlang:make_ref()},
- Worker2 = #shard{ref = erlang:make_ref()},
- Workers = [Worker0, Worker1, Worker2],
- Acc0 = #acc{workers = Workers, r = 2, replies = []},
-
- ?_test(begin
- meck:expect(rexi, kill_all, fun(_) -> ok end),
-
- % Test that we continue when we haven't met R yet
- ?assertMatch(
- {ok, #acc{
- workers = [Worker0, Worker1],
- replies = [{foo, {foo, 1}}]
- }},
- handle_message(foo, Worker2, Acc0)
- ),
-
- ?assertMatch(
- {ok, #acc{
- workers = [Worker0, Worker1],
- replies = [{bar, {bar, 1}}, {foo, {foo, 1}}]
- }},
- handle_message(bar, Worker2, Acc0#acc{
- replies = [{foo, {foo, 1}}]
- })
- ),
-
- % Test that we don't get a quorum when R isn't met. q_reply
- % isn't set and state remains unchanged and {stop, NewAcc}
- % is returned. Bit subtle on the assertions here.
-
- ?assertMatch(
- {stop, #acc{workers = [], replies = [{foo, {foo, 1}}]}},
- handle_message(foo, Worker0, Acc0#acc{workers = [Worker0]})
- ),
-
- ?assertMatch(
- {stop, #acc{
- workers = [],
- replies = [{bar, {bar, 1}}, {foo, {foo, 1}}]
- }},
- handle_message(bar, Worker0, Acc0#acc{
- workers = [Worker0],
- replies = [{foo, {foo, 1}}]
- })
- ),
-
- % Check that when R is met we stop with a new state and
- % a q_reply.
-
- ?assertMatch(
- {stop, #acc{
- workers = [],
- replies = [{foo, {foo, 2}}],
- state = r_met,
- q_reply = foo
- }},
- handle_message(foo, Worker1, Acc0#acc{
- workers = [Worker0, Worker1],
- replies = [{foo, {foo, 1}}]
- })
- ),
-
- ?assertEqual(
- {stop, #acc{
- workers = [],
- r = 1,
- replies = [{foo, {foo, 1}}],
- state = r_met,
- q_reply = foo
- }},
- handle_message(foo, Worker0, Acc0#acc{r = 1})
- ),
-
- ?assertMatch(
- {stop, #acc{
- workers = [],
- replies = [{bar, {bar, 1}}, {foo, {foo, 2}}],
- state = r_met,
- q_reply = foo
- }},
- handle_message(foo, Worker0, Acc0#acc{
- workers = [Worker0],
- replies = [{bar, {bar, 1}}, {foo, {foo, 1}}]
- })
- )
- end).
-
-t_store_node_revs() ->
- W1 = #shard{node = w1, ref = erlang:make_ref()},
- W2 = #shard{node = w2, ref = erlang:make_ref()},
- W3 = #shard{node = w3, ref = erlang:make_ref()},
- Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}},
- NFM = {not_found, missing},
-
- InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2},
-
- ?_test(begin
- meck:expect(rexi, kill_all, fun(_) -> ok end),
-
- % Simple case
- {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc),
- ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1),
-
- % Make sure we only hold the head rev
- {ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc),
- ?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2),
-
- % Make sure we don't capture anything on error
- {ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc),
- ?assertEqual([], NodeRevs3),
-
- % Make sure we accumulate node revs
- Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]},
- {ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1),
- ?assertEqual(
- [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
- NodeRevs4
- ),
-
- % Make sure rexi_DOWN doesn't modify node_revs
- Down = {rexi_DOWN, nil, {nil, w1}, nil},
- {ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1),
- ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5),
-
- % Make sure rexi_EXIT doesn't modify node_revs
- Exit = {rexi_EXIT, reason},
- {ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1),
- ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6),
-
- % Make sure an error doesn't remove any node revs
- {ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1),
- ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7),
-
- % Make sure we have all of our node_revs when meeting
- % quorum
- {ok, Acc2} = handle_message(Foo1, W1, InitAcc),
- {ok, Acc3} = handle_message(Foo2, W2, Acc2),
- {stop, Acc4} = handle_message(NFM, W3, Acc3),
- ?assertEqual(
- [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
- Acc4#acc.node_revs
- )
- end).
-
-t_read_repair() ->
- Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
- NFM = {not_found, missing},
-
- ?_test(begin
- meck:expect(couch_log, notice, fun(_, _) -> ok end),
- meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
-
- % Test when we have actual doc data to repair
- meck:expect(fabric, update_docs, fun(_, [_], _) -> {ok, []} end),
- Acc0 = #acc{
- dbname = <<"name">>,
- replies = [fabric_util:kv(Foo1, 1)]
- },
- ?assertEqual(Foo1, read_repair(Acc0)),
-
- meck:expect(fabric, update_docs, fun(_, [_, _], _) -> {ok, []} end),
- Acc1 = #acc{
- dbname = <<"name">>,
- replies = [fabric_util:kv(Foo1, 1), fabric_util:kv(Foo2, 1)]
- },
- ?assertEqual(Foo2, read_repair(Acc1)),
-
- % Test when we have nothing but errors
- Acc2 = #acc{replies = [fabric_util:kv(NFM, 1)]},
- ?assertEqual(NFM, read_repair(Acc2)),
-
- Acc3 = #acc{replies = [fabric_util:kv(NFM, 1), fabric_util:kv(foo, 2)]},
- ?assertEqual(NFM, read_repair(Acc3)),
-
- Acc4 = #acc{replies = [fabric_util:kv(foo, 1), fabric_util:kv(bar, 1)]},
- ?assertEqual(bar, read_repair(Acc4))
- end).
-
-t_handle_response_quorum_met() ->
- Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
- Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
-
- ?_test(begin
- meck:expect(couch_log, notice, fun(_, _) -> ok end),
- meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, []} end),
- meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
-
- BasicOkAcc = #acc{
- state = r_met,
- replies = [fabric_util:kv(Foo1, 2)],
- q_reply = Foo1
- },
- ?assertEqual(Foo1, handle_response(BasicOkAcc)),
-
- WithAncestorsAcc = #acc{
- state = r_met,
- replies = [fabric_util:kv(Foo1, 1), fabric_util:kv(Foo2, 2)],
- q_reply = Foo2
- },
- ?assertEqual(Foo2, handle_response(WithAncestorsAcc)),
-
- % This also checks when the quorum isn't the most recent
- % revision.
- DeeperWinsAcc = #acc{
- state = r_met,
- replies = [fabric_util:kv(Foo1, 2), fabric_util:kv(Foo2, 1)],
- q_reply = Foo1
- },
- ?assertEqual(Foo2, handle_response(DeeperWinsAcc)),
-
- % Check that we return the proper doc based on rev
- % (ie, pos is equal)
- BiggerRevWinsAcc = #acc{
- state = r_met,
- replies = [fabric_util:kv(Foo1, 1), fabric_util:kv(Bar1, 2)],
- q_reply = Bar1
- },
- ?assertEqual(Foo1, handle_response(BiggerRevWinsAcc))
-
- % r_not_met is a proxy to read_repair so we rely on
- % read_repair_test for those conditions.
- end).
-
-t_get_doc_info() ->
- ?_test(begin
- meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, []} end),
- meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
- meck:expect(fabric_util, submit_jobs, fun(_, _, _) -> ok end),
- meck:expect(fabric_util, create_monitors, fun(_) -> ok end),
- meck:expect(rexi_monitor, stop, fun(_) -> ok end),
- meck:expect(mem3, shards, fun(_, _) -> ok end),
- meck:expect(mem3, n, fun(_) -> 3 end),
- meck:expect(mem3, quorum, fun(_) -> 2 end),
-
- meck:expect(fabric_util, recv, fun(_, _, _, _) ->
- {ok, #acc{state = r_not_met}}
- end),
- Rsp1 = fabric_doc_open:go("test", "one", [doc_info]),
- ?assertEqual({error, quorum_not_met}, Rsp1),
-
- Rsp2 = fabric_doc_open:go("test", "one", [{doc_info, full}]),
- ?assertEqual({error, quorum_not_met}, Rsp2),
-
- meck:expect(fabric_util, recv, fun(_, _, _, _) ->
- {ok, #acc{state = r_met, q_reply = not_found}}
- end),
- MissingRsp1 = fabric_doc_open:go("test", "one", [doc_info]),
- ?assertEqual({not_found, missing}, MissingRsp1),
- MissingRsp2 = fabric_doc_open:go("test", "one", [{doc_info, full}]),
- ?assertEqual({not_found, missing}, MissingRsp2),
-
- meck:expect(fabric_util, recv, fun(_, _, _, _) ->
- A = #doc_info{},
- {ok, #acc{state = r_met, q_reply = {ok, A}}}
- end),
- {ok, Rec1} = fabric_doc_open:go("test", "one", [doc_info]),
- ?assert(is_record(Rec1, doc_info)),
-
- meck:expect(fabric_util, recv, fun(_, _, _, _) ->
- A = #full_doc_info{deleted = true},
- {ok, #acc{state = r_met, q_reply = {ok, A}}}
- end),
- Rsp3 = fabric_doc_open:go("test", "one", [{doc_info, full}]),
- ?assertEqual({not_found, deleted}, Rsp3),
- {ok, Rec2} = fabric_doc_open:go("test", "one", [{doc_info, full}, deleted]),
- ?assert(is_record(Rec2, full_doc_info))
- end).
-
--endif.