summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-05-30 17:26:02 -0500
committerjiangph <jiangph@cn.ibm.com>2018-08-22 00:59:16 +0800
commit0234679661383fd71dd7d15a51d4454d138f0e90 (patch)
tree533e864e1835694a9f60eaae8793e496f47b7723
parentc625933fc3c69d300da3d6dbb2ffb5d3c8a26499 (diff)
downloadcouchdb-0234679661383fd71dd7d15a51d4454d138f0e90.tar.gz
[08/10] Clustered Purge: Update read-repair
Read-repair needs to know which nodes have requested an update to a local doc so that it can determine if the update is applied. The basic idea here is that we may have gotten an update from a remote node that has yet to apply a purge request. If the local node were to apply this update it would effectively undo a succesful purge request. COUCHDB-3326 Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com> Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r--src/fabric/src/fabric_doc_open.erl73
-rw-r--r--src/fabric/src/fabric_doc_open_revs.erl262
-rw-r--r--src/fabric/src/fabric_rpc.erl128
-rw-r--r--src/fabric/test/fabric_rpc_purge_tests.erl285
4 files changed, 692 insertions, 56 deletions
diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl
index 93f73a821..0a85346f7 100644
--- a/src/fabric/src/fabric_doc_open.erl
+++ b/src/fabric/src/fabric_doc_open.erl
@@ -25,6 +25,7 @@
r,
state,
replies,
+ node_revs = [],
q_reply
}).
@@ -83,7 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
end;
handle_message(Reply, Worker, Acc) ->
NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
- NewAcc = Acc#acc{replies = NewReplies},
+ NewNodeRevs = case Reply of
+ {ok, #doc{revs = {Pos, [Rev | _]}}} ->
+ [{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs];
+ _ ->
+ Acc#acc.node_revs
+ end,
+ NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs},
case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
{true, QuorumReply} ->
fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +129,14 @@ is_r_met(Workers, Replies, R) ->
no_more_workers
end.
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, node_revs=NodeRevs}) ->
Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
case Docs of
% omit local docs from read repair
[#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
choose_reply(Docs);
[#doc{id=Id} | _] ->
- Opts = [replicated_changes, ?ADMIN_CTX],
+ Opts = [?ADMIN_CTX, {read_repair, NodeRevs}],
Res = fabric:update_docs(DbName, Docs, Opts),
case Res of
{ok, []} ->
@@ -205,6 +212,7 @@ open_doc_test_() ->
t_handle_message_down(),
t_handle_message_exit(),
t_handle_message_reply(),
+ t_store_node_revs(),
t_read_repair(),
t_handle_response_quorum_met(),
t_get_doc_info()
@@ -397,6 +405,65 @@ t_handle_message_reply() ->
end).
+t_store_node_revs() ->
+ W1 = #shard{node = w1, ref = erlang:make_ref()},
+ W2 = #shard{node = w2, ref = erlang:make_ref()},
+ W3 = #shard{node = w3, ref = erlang:make_ref()},
+ Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ NFM = {not_found, missing},
+
+ InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2},
+
+ ?_test(begin
+ meck:expect(rexi, kill, fun(_, _) -> ok end),
+
+ % Simple case
+ {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc),
+ ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1),
+
+ % Make sure we only hold the head rev
+ {ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc),
+ ?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2),
+
+ % Make sure we don't capture anything on error
+ {ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc),
+ ?assertEqual([], NodeRevs3),
+
+ % Make sure we accumulate node revs
+ Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]},
+ {ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1),
+ ?assertEqual(
+ [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
+ NodeRevs4
+ ),
+
+ % Make sure rexi_DOWN doesn't modify node_revs
+ Down = {rexi_DOWN, nil, {nil, w1}, nil},
+ {ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1),
+ ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5),
+
+ % Make sure rexi_EXIT doesn't modify node_revs
+ Exit = {rexi_EXIT, reason},
+ {ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1),
+ ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6),
+
+ % Make sure an error doesn't remove any node revs
+ {ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1),
+ ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7),
+
+ % Make sure we have all of our node_revs when meeting
+ % quorum
+ {ok, Acc2} = handle_message(Foo1, W1, InitAcc),
+ {ok, Acc3} = handle_message(Foo2, W2, Acc2),
+ {stop, Acc4} = handle_message(NFM, W3, Acc3),
+ ?assertEqual(
+ [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
+ Acc4#acc.node_revs
+ )
+ end).
+
+
t_read_repair() ->
Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl
index 096722fa0..234b108ef 100644
--- a/src/fabric/src/fabric_doc_open_revs.erl
+++ b/src/fabric/src/fabric_doc_open_revs.erl
@@ -29,6 +29,7 @@
revs,
latest,
replies = [],
+ node_revs = [],
repair = false
}).
@@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
worker_count = WorkerCount,
workers = Workers,
replies = PrevReplies,
+ node_revs = PrevNodeRevs,
r = R,
revs = Revs,
latest = Latest,
@@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) ->
IsTree = Revs == all orelse Latest,
% Do not count error replies when checking quorum
-
RealReplyCount = ReplyCount + 1 - ReplyErrorCount,
QuorumReplies = RealReplyCount >= R,
{NewReplies, QuorumMet, Repair} = case IsTree of
@@ -102,11 +103,23 @@ handle_message({ok, RawReplies}, Worker, State) ->
NumLeafs = couch_key_tree:count_leafs(PrevReplies),
SameNumRevs = length(RawReplies) == NumLeafs,
QMet = AllInternal andalso SameNumRevs andalso QuorumReplies,
- {NewReplies0, QMet, Repair0};
+ % Don't set repair=true on the first reply
+ {NewReplies0, QMet, (ReplyCount > 0) and Repair0};
false ->
{NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies),
{NewReplies0, MinCount >= R, false}
end,
+ NewNodeRevs = if Worker == nil -> PrevNodeRevs; true ->
+ IdRevs = lists:foldl(fun
+ ({ok, #doc{revs = {Pos, [Rev | _]}}}, Acc) ->
+ [{Pos, Rev} | Acc];
+ (_, Acc) ->
+ Acc
+ end, [], RawReplies),
+ if IdRevs == [] -> PrevNodeRevs; true ->
+ [{Worker#shard.node, IdRevs} | PrevNodeRevs]
+ end
+ end,
Complete = (ReplyCount =:= (WorkerCount - 1)),
@@ -117,6 +130,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
DbName,
IsTree,
NewReplies,
+ NewNodeRevs,
ReplyCount + 1,
InRepair orelse Repair
),
@@ -124,6 +138,7 @@ handle_message({ok, RawReplies}, Worker, State) ->
false ->
{ok, State#state{
replies = NewReplies,
+ node_revs = NewNodeRevs,
reply_count = ReplyCount + 1,
workers = lists:delete(Worker, Workers),
repair = InRepair orelse Repair
@@ -180,7 +195,7 @@ dict_replies(Dict, [Reply | Rest]) ->
dict_replies(NewDict, Rest).
-maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
+maybe_read_repair(Db, IsTree, Replies, NodeRevs, ReplyCount, DoRepair) ->
Docs = case IsTree of
true -> tree_repair_docs(Replies, DoRepair);
false -> dict_repair_docs(Replies, ReplyCount)
@@ -189,7 +204,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) ->
[] ->
ok;
_ ->
- erlang:spawn(fun() -> read_repair(Db, Docs) end)
+ erlang:spawn(fun() -> read_repair(Db, Docs, NodeRevs) end)
end.
@@ -208,8 +223,9 @@ dict_repair_docs(Replies, ReplyCount) ->
end.
-read_repair(Db, Docs) ->
- Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]),
+read_repair(Db, Docs, NodeRevs) ->
+ Opts = [?ADMIN_CTX, {read_repair, NodeRevs}],
+ Res = fabric:update_docs(Db, Docs, Opts),
case Res of
{ok, []} ->
couch_stats:increment_counter([fabric, read_repairs, success]);
@@ -268,20 +284,24 @@ filter_reply(Replies) ->
setup() ->
config:start_link([]),
meck:new([fabric, couch_stats, couch_log]),
+ meck:new(fabric_util, [passthrough]),
meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
- meck:expect(couch_log, notice, fun(_, _) -> ok end).
+ meck:expect(couch_log, notice, fun(_, _) -> ok end),
+ meck:expect(fabric_util, cleanup, fun(_) -> ok end).
+
teardown(_) ->
- (catch meck:unload([fabric, couch_stats, couch_log])),
+ (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])),
config:stop().
state0(Revs, Latest) ->
#state{
worker_count = 3,
- workers = [w1, w2, w3],
+ workers =
+ [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}],
r = 2,
revs = Revs,
latest = Latest
@@ -321,6 +341,14 @@ open_doc_revs_test_() ->
check_worker_error_skipped(),
check_quorum_only_counts_valid_responses(),
check_empty_list_when_no_workers_reply(),
+ check_node_rev_stored(),
+ check_node_rev_store_head_only(),
+ check_node_rev_store_multiple(),
+ check_node_rev_dont_store_errors(),
+ check_node_rev_store_non_errors(),
+ check_node_rev_store_concatenate(),
+ check_node_rev_store_concantenate_multiple(),
+ check_node_rev_unmodified_on_down_or_exit(),
check_not_found_replies_are_removed_when_doc_found(),
check_not_found_returned_when_one_of_docs_not_found(),
check_not_found_returned_when_doc_not_found()
@@ -334,27 +362,35 @@ open_doc_revs_test_() ->
check_empty_response_not_quorum() ->
% Simple smoke test that we don't think we're
% done with a first empty response
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
?_assertMatch(
- {ok, #state{workers = [w2, w3]}},
- handle_message({ok, []}, w1, state0(all, false))
+ {ok, #state{workers = [W2, W3]}},
+ handle_message({ok, []}, W1, state0(all, false))
).
check_basic_response() ->
% Check that we've handle a response
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
?_assertMatch(
- {ok, #state{reply_count = 1, workers = [w2, w3]}},
- handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false))
+ {ok, #state{reply_count = 1, workers = [W2, W3]}},
+ handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false))
).
check_finish_quorum() ->
% Two messages with the same revisions means we're done
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
Expect = {stop, [bar1(), foo1()]},
- ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1))
+ ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1))
end).
@@ -363,11 +399,13 @@ check_finish_quorum_newer() ->
% foo1 should count for foo2 which means we're finished.
% We also validate that read_repair was triggered.
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
Expect = {stop, [bar1(), foo2()]},
ok = meck:reset(fabric),
- ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)),
+ ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)),
ok = meck:wait(fabric, update_docs, '_', 5000),
?assertMatch(
[{_, {fabric, update_docs, [_, _, _]}, _}],
@@ -380,11 +418,14 @@ check_no_quorum_on_second() ->
% Quorum not yet met for the foo revision so we
% would wait for w3
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
?assertMatch(
- {ok, #state{workers = [w3]}},
- handle_message({ok, [bar1()]}, w2, S1)
+ {ok, #state{workers = [W3]}},
+ handle_message({ok, [bar1()]}, W2, S1)
)
end).
@@ -394,11 +435,14 @@ check_done_on_third() ->
% what. Every revision seen in this pattern should be
% included.
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(all, false),
- {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0),
- {ok, S2} = handle_message({ok, [bar1()]}, w2, S1),
+ {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0),
+ {ok, S2} = handle_message({ok, [bar1()]}, W2, S1),
Expect = {stop, [bar1(), foo1()]},
- ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2))
+ ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2))
end).
@@ -407,108 +451,234 @@ check_done_on_third() ->
check_specific_revs_first_msg() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), false),
?assertMatch(
- {ok, #state{reply_count = 1, workers = [w2, w3]}},
- handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0)
+ {ok, #state{reply_count = 1, workers = [W2, W3]}},
+ handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0)
)
end).
check_revs_done_on_agreement() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), false),
Msg = {ok, [foo1(), bar1(), bazNF()]},
- {ok, S1} = handle_message(Msg, w1, S0),
+ {ok, S1} = handle_message(Msg, W1, S0),
Expect = {stop, [bar1(), foo1(), bazNF()]},
- ?assertEqual(Expect, handle_message(Msg, w2, S1))
+ ?assertEqual(Expect, handle_message(Msg, W2, S1))
end).
check_latest_true() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo2(), bar1(), bazNF()]},
Msg2 = {ok, [foo2(), bar1(), bazNF()]},
- {ok, S1} = handle_message(Msg1, w1, S0),
+ {ok, S1} = handle_message(Msg1, W1, S0),
Expect = {stop, [bar1(), foo2(), bazNF()]},
- ?assertEqual(Expect, handle_message(Msg2, w2, S1))
+ ?assertEqual(Expect, handle_message(Msg2, W2, S1))
end).
check_ancestor_counted_in_quorum() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo1(), bar1(), bazNF()]},
Msg2 = {ok, [foo2(), bar1(), bazNF()]},
Expect = {stop, [bar1(), foo2(), bazNF()]},
% Older first
- {ok, S1} = handle_message(Msg1, w1, S0),
- ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
% Newer first
- {ok, S2} = handle_message(Msg2, w2, S0),
- ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+ {ok, S2} = handle_message(Msg2, W2, S0),
+ ?assertEqual(Expect, handle_message(Msg1, W1, S2))
end).
check_not_found_counts_for_descendant() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo1(), bar1(), bazNF()]},
Msg2 = {ok, [foo1(), bar1(), baz1()]},
Expect = {stop, [bar1(), baz1(), foo1()]},
% not_found first
- {ok, S1} = handle_message(Msg1, w1, S0),
- ?assertEqual(Expect, handle_message(Msg2, w2, S1)),
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ ?assertEqual(Expect, handle_message(Msg2, W2, S1)),
% not_found second
- {ok, S2} = handle_message(Msg2, w2, S0),
- ?assertEqual(Expect, handle_message(Msg1, w1, S2))
+ {ok, S2} = handle_message(Msg2, W2, S0),
+ ?assertEqual(Expect, handle_message(Msg1, W1, S2))
end).
check_worker_error_skipped() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), true),
Msg1 = {ok, [foo1(), bar1(), baz1()]},
Msg2 = {rexi_EXIT, reason},
Msg3 = {ok, [foo1(), bar1(), baz1()]},
Expect = {stop, [bar1(), baz1(), foo1()]},
- {ok, S1} = handle_message(Msg1, w1, S0),
- {ok, S2} = handle_message(Msg2, w2, S1),
- ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ {ok, S2} = handle_message(Msg2, W2, S1),
+ ?assertEqual(Expect, handle_message(Msg3, W3, S2))
end).
check_quorum_only_counts_valid_responses() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), true),
Msg1 = {rexi_EXIT, reason},
Msg2 = {rexi_EXIT, reason},
Msg3 = {ok, [foo1(), bar1(), baz1()]},
Expect = {stop, [bar1(), baz1(), foo1()]},
- {ok, S1} = handle_message(Msg1, w1, S0),
- {ok, S2} = handle_message(Msg2, w2, S1),
- ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ {ok, S2} = handle_message(Msg2, W2, S1),
+ ?assertEqual(Expect, handle_message(Msg3, W3, S2))
end).
check_empty_list_when_no_workers_reply() ->
?_test(begin
+ W1 = #shard{node='node1'},
+ W2 = #shard{node='node2'},
+ W3 = #shard{node='node3'},
S0 = state0(revs(), true),
Msg1 = {rexi_EXIT, reason},
Msg2 = {rexi_EXIT, reason},
Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil},
Expect = {stop, all_workers_died},
- {ok, S1} = handle_message(Msg1, w1, S0),
- {ok, S2} = handle_message(Msg2, w2, S1),
- ?assertEqual(Expect, handle_message(Msg3, w3, S2))
+ {ok, S1} = handle_message(Msg1, W1, S0),
+ {ok, S2} = handle_message(Msg2, W2, S1),
+ ?assertEqual(Expect, handle_message(Msg3, W3, S2))
+ end).
+
+
+check_node_rev_stored() ->
+ ?_test(begin
+ W1 = #shard{node = node1},
+ S0 = state0([], true),
+
+ {ok, S1} = handle_message({ok, [foo1()]}, W1, S0),
+ ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs)
+ end).
+
+
+check_node_rev_store_head_only() ->
+ ?_test(begin
+ W1 = #shard{node = node1},
+ S0 = state0([], true),
+
+ {ok, S1} = handle_message({ok, [foo2()]}, W1, S0),
+ ?assertEqual([{node1, [{2, <<"foo2">>}]}], S1#state.node_revs)
+ end).
+
+
+check_node_rev_store_multiple() ->
+ ?_test(begin
+ W1 = #shard{node = node1},
+ S0 = state0([], true),
+
+ {ok, S1} = handle_message({ok, [foo1(), foo2()]}, W1, S0),
+ ?assertEqual(
+ [{node1, [{2, <<"foo2">>}, {1, <<"foo">>}]}],
+ S1#state.node_revs
+ )
+ end).
+
+
+check_node_rev_dont_store_errors() ->
+ ?_test(begin
+ W1 = #shard{node = node1},
+ S0 = state0([], true),
+
+ {ok, S1} = handle_message({ok, [barNF()]}, W1, S0),
+ ?assertEqual([], S1#state.node_revs)
+ end).
+
+
+check_node_rev_store_non_errors() ->
+ ?_test(begin
+ W1 = #shard{node = node1},
+ S0 = state0([], true),
+
+ {ok, S1} = handle_message({ok, [foo1(), barNF()]}, W1, S0),
+ ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs)
+ end).
+
+
+check_node_rev_store_concatenate() ->
+ ?_test(begin
+ W2 = #shard{node = node2},
+ S0 = state0([], true),
+ S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+ {ok, S2} = handle_message({ok, [foo2()]}, W2, S1),
+ ?assertEqual(
+ [{node2, [{2, <<"foo2">>}]}, {node1, [{1, <<"foo">>}]}],
+ S2#state.node_revs
+ )
+ end).
+
+
+check_node_rev_store_concantenate_multiple() ->
+ ?_test(begin
+ W2 = #shard{node = node2},
+ S0 = state0([], true),
+ S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+ {ok, S2} = handle_message({ok, [foo2(), bar1()]}, W2, S1),
+ ?assertEqual(
+ [
+ {node2, [{1, <<"bar">>}, {2, <<"foo2">>}]},
+ {node1, [{1, <<"foo">>}]}
+ ],
+ S2#state.node_revs
+ )
+ end).
+
+
+check_node_rev_unmodified_on_down_or_exit() ->
+ ?_test(begin
+ W2 = #shard{node = node2},
+ S0 = state0([], true),
+ S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]},
+
+ Down = {rexi_DOWN, nodedown, {nil, node()}, nil},
+ {ok, S2} = handle_message(Down, W2, S1),
+ ?assertEqual(
+ [{node1, [{1, <<"foo">>}]}],
+ S2#state.node_revs
+ ),
+
+ Exit = {rexi_EXIT, reason},
+ {ok, S3} = handle_message(Exit, W2, S1),
+ ?assertEqual(
+ [{node1, [{1, <<"foo">>}]}],
+ S3#state.node_revs
+ )
end).
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index ef4092d56..c68422969 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -21,6 +21,7 @@
delete_shard_db_doc/2]).
-export([get_all_security/2, open_shard/2]).
-export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]).
-export([get_db_info/2, get_doc_count/2, get_update_seq/2,
changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -202,6 +203,9 @@ get_all_security(DbName, Options) ->
set_revs_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
+set_purge_infos_limit(DbName, Limit, Options) ->
+ with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}).
+
open_doc(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
@@ -237,14 +241,26 @@ get_missing_revs(DbName, IdRevsList, Options) ->
end).
update_docs(DbName, Docs0, Options) ->
- case proplists:get_value(replicated_changes, Options) of
- true ->
- X = replicated_changes;
- _ ->
- X = interactive_edit
+ {Docs1, Type} = case couch_util:get_value(read_repair, Options) of
+ NodeRevs when is_list(NodeRevs) ->
+ Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options),
+ {Filtered, replicated_changes};
+ undefined ->
+ X = case proplists:get_value(replicated_changes, Options) of
+ true -> replicated_changes;
+ _ -> interactive_edit
+ end,
+ {Docs0, X}
end,
- Docs = make_att_readers(Docs0),
- with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+ Docs2 = make_att_readers(Docs1),
+ with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}).
+
+
+get_purge_seq(DbName, Options) ->
+ with_db(DbName, Options, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
+ with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}).
%% @equiv group_info(DbName, DDocId, [])
group_info(DbName, DDocId) ->
@@ -299,6 +315,104 @@ with_db(DbName, Options, {M,F,A}) ->
rexi:reply(Error)
end.
+
+read_repair_filter(DbName, Docs, NodeRevs, Options) ->
+ set_io_priority(DbName, Options),
+ case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ try
+ read_repair_filter(Db, Docs, NodeRevs)
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
+% A read repair operation may have been triggered by a node
+% that was out of sync with the local node. Thus, any time
+% we receive a read repair request we need to check if we
+% may have recently purged any of the given revisions and
+% ignore them if so.
+%
+% This is accomplished by looking at the purge infos that we
+% have locally that have not been replicated to the remote
+% node. The logic here is that we may have received the purge
+% request before the remote shard copy. So to check that we
+% need to look at the purge infos that we have locally but
+% have not yet sent to the remote copy.
+%
+% NodeRevs is a list of the {node(), [rev()]} tuples passed
+% as the read_repair option to update_docs.
+read_repair_filter(Db, Docs, NodeRevs) ->
+ [#doc{id = DocId} | _] = Docs,
+ Nodes = lists:usort([Node || {Node, _} <- NodeRevs, Node /= node()]),
+ NodeSeqs = get_node_seqs(Db, Nodes),
+
+ DbPSeq = couch_db:get_purge_seq(Db),
+ Lag = config:get_integer("couchdb", "read_repair_lag", 100),
+
+ % Filter out read-repair updates from any node that is
+ % so out of date that it would force us to scan a large
+ % number of purge infos
+ NodeFiltFun = fun({Node, _Revs}) ->
+ {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs),
+ NodeSeq >= DbPSeq - Lag
+ end,
+ RecentNodeRevs = lists:filter(NodeFiltFun, NodeRevs),
+
+ % For each node we scan the purge infos to filter out any
+ % revisions that have been locally purged since we last
+ % replicated to the remote node's shard copy.
+ AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) ->
+ {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs),
+ FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) ->
+ if PDocId /= DocId -> {ok, InnerAcc}; true ->
+ {ok, InnerAcc -- PRevs}
+ end
+ end,
+ {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs),
+ lists:usort(FiltRevs ++ RevAcc)
+ end, [], RecentNodeRevs),
+
+ % Finally, filter the doc updates to only include revisions
+ % that have not been purged locally.
+ DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) ->
+ lists:member({Pos, Rev}, AllowableRevs)
+ end,
+ lists:filter(DocFiltFun, Docs).
+
+
+get_node_seqs(Db, Nodes) ->
+ % Gather the list of {Node, PurgeSeq} pairs for all nodes
+ % that are present in our read repair group
+ FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+ case Id of
+ <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> ->
+ TgtNode = couch_util:get_value(<<"target_node">>, Props),
+ PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ case lists:keyfind(TgtNode, 1, Acc) of
+ {_, OldSeq} ->
+ NewSeq = erlang:max(OldSeq, PurgeSeq),
+ NewEntry = {TgtNode, NewSeq},
+ NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry),
+ {ok, NewAcc};
+ false ->
+ {ok, Acc}
+ end;
+ _ ->
+ % We've processed all _local mem3 purge docs
+ {stop, Acc}
+ end
+ end,
+ InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes],
+ Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}],
+ {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts),
+ [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs].
+
+
+
get_or_create_db(DbName, Options) ->
couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
diff --git a/src/fabric/test/fabric_rpc_purge_tests.erl b/src/fabric/test/fabric_rpc_purge_tests.erl
new file mode 100644
index 000000000..26507cf0b
--- /dev/null
+++ b/src/fabric/test/fabric_rpc_purge_tests.erl
@@ -0,0 +1,285 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc_purge_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {A, fun A/1}).
+
+% TODO: Add tests:
+% - filter some updates
+% - allow for an update that was filtered by a node
+% - ignore lagging nodes
+
+main_test_() ->
+ {
+ setup,
+ spawn,
+ fun setup_all/0,
+ fun teardown_all/1,
+ [
+ {
+ foreach,
+ fun setup_no_purge/0,
+ fun teardown_no_purge/1,
+ lists:map(fun wrap/1, [
+ ?TDEF(t_no_purge_no_filter)
+ ])
+ },
+ {
+ foreach,
+ fun setup_single_purge/0,
+ fun teardown_single_purge/1,
+ lists:map(fun wrap/1, [
+ ?TDEF(t_filter),
+ ?TDEF(t_filter_unknown_node),
+ ?TDEF(t_no_filter_old_node),
+ ?TDEF(t_no_filter_different_node),
+ ?TDEF(t_no_filter_after_repl)
+ ])
+ },
+ {
+ foreach,
+ fun setup_multi_purge/0,
+ fun teardown_multi_purge/1,
+ lists:map(fun wrap/1, [
+ ?TDEF(t_filter),
+ ?TDEF(t_filter_unknown_node),
+ ?TDEF(t_no_filter_old_node),
+ ?TDEF(t_no_filter_different_node),
+ ?TDEF(t_no_filter_after_repl)
+ ])
+ }
+ ]
+ }.
+
+
+setup_all() ->
+ test_util:start_couch().
+
+
+teardown_all(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+setup_no_purge() ->
+ {ok, Db} = create_db(),
+ populate_db(Db),
+ couch_db:name(Db).
+
+
+teardown_no_purge(DbName) ->
+ ok = couch_server:delete(DbName, []).
+
+
+setup_single_purge() ->
+ DbName = setup_no_purge(),
+ DocId = <<"0003">>,
+ {ok, OldDoc} = open_doc(DbName, DocId),
+ purge_doc(DbName, DocId),
+ {DbName, DocId, OldDoc, 1}.
+
+
+teardown_single_purge({DbName, _, _, _}) ->
+ teardown_no_purge(DbName).
+
+
+setup_multi_purge() ->
+ DbName = setup_no_purge(),
+ DocId = <<"0003">>,
+ {ok, OldDoc} = open_doc(DbName, DocId),
+ lists:foreach(fun(I) ->
+ PDocId = iolist_to_binary(io_lib:format("~4..0b", [I])),
+ purge_doc(DbName, PDocId)
+ end, lists:seq(1, 5)),
+ {DbName, DocId, OldDoc, 3}.
+
+
+teardown_multi_purge(Ctx) ->
+ teardown_single_purge(Ctx).
+
+
+t_no_purge_no_filter(DbName) ->
+ DocId = <<"0003">>,
+
+ {ok, OldDoc} = open_doc(DbName, DocId),
+ NewDoc = create_update(OldDoc, 2),
+
+ rpc_update_doc(DbName, NewDoc),
+
+ {ok, CurrDoc} = open_doc(DbName, DocId),
+ ?assert(CurrDoc /= OldDoc),
+ ?assert(CurrDoc == NewDoc).
+
+
+t_filter({DbName, DocId, OldDoc, _PSeq}) ->
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+ create_purge_checkpoint(DbName, 0),
+
+ rpc_update_doc(DbName, OldDoc),
+
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_filter_unknown_node({DbName, DocId, OldDoc, _PSeq}) ->
+ % Unknown nodes are assumed to start at PurgeSeq = 0
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+ create_purge_checkpoint(DbName, 0),
+
+ {Pos, [Rev | _]} = OldDoc#doc.revs,
+ RROpt = {read_repair, [{'blargh@127.0.0.1', [{Pos, Rev}]}]},
+ rpc_update_doc(DbName, OldDoc, [RROpt]),
+
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)).
+
+
+t_no_filter_old_node({DbName, DocId, OldDoc, PSeq}) ->
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+ create_purge_checkpoint(DbName, PSeq),
+
+ % The random UUID is to generate a badarg exception when
+ % we try and convert it to an existing atom.
+ create_purge_checkpoint(DbName, 0, couch_uuids:random()),
+
+ rpc_update_doc(DbName, OldDoc),
+
+ ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_different_node({DbName, DocId, OldDoc, PSeq}) ->
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+ create_purge_checkpoint(DbName, PSeq),
+
+ % Create a valid purge for a different node
+ TgtNode = list_to_binary(atom_to_list('notfoo@127.0.0.1')),
+ create_purge_checkpoint(DbName, 0, TgtNode),
+
+ rpc_update_doc(DbName, OldDoc),
+
+ ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+t_no_filter_after_repl({DbName, DocId, OldDoc, PSeq}) ->
+ ?assertEqual({not_found, missing}, open_doc(DbName, DocId)),
+ create_purge_checkpoint(DbName, PSeq),
+
+ rpc_update_doc(DbName, OldDoc),
+
+ ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)).
+
+
+wrap({Name, Fun}) ->
+ fun(Arg) ->
+ {timeout, 60, {atom_to_list(Name), fun() ->
+ process_flag(trap_exit, true),
+ Fun(Arg)
+ end}}
+ end.
+
+
+create_db() ->
+ DbName = ?tempdb(),
+ couch_db:create(DbName, [?ADMIN_CTX]).
+
+
+populate_db(Db) ->
+ Docs = lists:map(fun(Idx) ->
+ DocId = lists:flatten(io_lib:format("~4..0b", [Idx])),
+ #doc{
+ id = list_to_binary(DocId),
+ body = {[{<<"int">>, Idx}, {<<"vsn">>, 2}]}
+ }
+ end, lists:seq(1, 100)),
+ {ok, _} = couch_db:update_docs(Db, Docs).
+
+
+open_doc(DbName, DocId) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ couch_db:open_doc(Db, DocId, [])
+ end).
+
+
+create_update(Doc, NewVsn) ->
+ #doc{
+ id = DocId,
+ revs = {Pos, [Rev | _] = Revs},
+ body = {Props}
+ } = Doc,
+ NewProps = lists:keyreplace(<<"vsn">>, 1, Props, {<<"vsn">>, NewVsn}),
+ NewRev = crypto:hash(md5, term_to_binary({DocId, Rev, {NewProps}})),
+ Doc#doc{
+ revs = {Pos + 1, [NewRev | Revs]},
+ body = {NewProps}
+ }.
+
+
+purge_doc(DbName, DocId) ->
+ {ok, Doc} = open_doc(DbName, DocId),
+ {Pos, [Rev | _]} = Doc#doc.revs,
+ PInfo = {couch_uuids:random(), DocId, [{Pos, Rev}]},
+ Resp = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:purge_docs(Db, [PInfo], [])
+ end),
+ ?assertEqual({ok, [{ok, [{Pos, Rev}]}]}, Resp).
+
+
+create_purge_checkpoint(DbName, PurgeSeq) ->
+ create_purge_checkpoint(DbName, PurgeSeq, tgt_node_bin()).
+
+
+create_purge_checkpoint(DbName, PurgeSeq, TgtNode) when is_binary(TgtNode) ->
+ Resp = couch_util:with_db(DbName, fun(Db) ->
+ SrcUUID = couch_db:get_uuid(Db),
+ TgtUUID = couch_uuids:random(),
+ CPDoc = #doc{
+ id = mem3_rep:make_purge_id(SrcUUID, TgtUUID),
+ body = {[
+ {<<"target_node">>, TgtNode},
+ {<<"purge_seq">>, PurgeSeq}
+ ]}
+ },
+ couch_db:update_docs(Db, [CPDoc], [])
+ end),
+ ?assertMatch({ok, [_]}, Resp).
+
+
+rpc_update_doc(DbName, Doc) ->
+ {Pos, [Rev | _]} = Doc#doc.revs,
+ RROpt = {read_repair, [{tgt_node(), [{Pos, Rev}]}]},
+ rpc_update_doc(DbName, Doc, [RROpt]).
+
+
+rpc_update_doc(DbName, Doc, Opts) ->
+ Ref = erlang:make_ref(),
+ put(rexi_from, {self(), Ref}),
+ fabric_rpc:update_docs(DbName, [Doc], Opts),
+ Reply = test_util:wait(fun() ->
+ receive
+ {Ref, Reply} ->
+ Reply
+ after 0 ->
+ wait
+ end
+ end),
+ ?assertEqual({ok, []}, Reply).
+
+
+tgt_node() ->
+ 'foo@127.0.0.1'.
+
+
+tgt_node_bin() ->
+ iolist_to_binary(atom_to_list(tgt_node())). \ No newline at end of file