diff options
author | Mayya Sharipova <mayyas@ca.ibm.com> | 2017-05-01 14:36:39 -0400 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-03-13 15:49:51 -0500 |
commit | d89a79a78f5c4ef293a94339a5848afdd58c1bc1 (patch) | |
tree | 1fec382b10f159b7a89e3d5ac14b180ad6da7df9 | |
parent | 3c8e5927950ae95dd3c6ed40916c3e6bcb0361ef (diff) | |
download | couchdb-d89a79a78f5c4ef293a94339a5848afdd58c1bc1.tar.gz |
Add internal replication of purge requests
* Add initial pull replication of purge requests
- Each internal replication job starts by pulling purge requests from
target and applying them on source. If a source and target were
disconnected during a purge request, it's possible that the target
has received a purge request not yet present on the source. Given
that internal replication is push oriented it would be possible for the
source and target to reconnect and have the source push a revision that
has since been purged. To avoid this we should pull purge requests
from the target to ensure we're up to date before beginning internal
replication.
- Add _local/purge-mem3-$hash docs in mem3_rep. mem3 writes a
_local/purge-mem3-$hash document once purge requests have been
replicated. This document will exist on the target and the
purge_seq value will be the target's purge_seq that has been processed
during *pull* replication.
* Add push replication of purge requests
- Push new purge requests from source to target, and apply them on
target
- Update checkpoint docs to store the purge_seq on source
COUCHDB-3326
-rw-r--r-- | src/mem3/src/mem3_rep.erl | 142 | ||||
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 128 |
2 files changed, 252 insertions, 18 deletions
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 942f8a8e0..44aee49cd 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -17,7 +17,9 @@ go/2, go/3, make_local_id/2, - find_source_seq/4 + make_local_purge_id/2, + find_source_seq/4, + mem3_sync_purge/1 ]). -export([ @@ -39,7 +41,8 @@ target, filter, db, - history = {[]} + history = {[]}, + purge_seq = 0 }). @@ -119,6 +122,12 @@ make_local_id(SourceThing, TargetThing, Filter) -> <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>. +make_local_purge_id(SourceUUID, TargetUUID) -> + V = ?l2b("v" ++ config:get("purge", "version", "1") ++ "-"), + <<"_local/purge-", V/binary, "mem3-", + SourceUUID/binary, "-", TargetUUID/binary>>. + + %% @doc Find and return the largest update_seq in SourceDb %% that the client has seen from TargetNode. %% @@ -172,18 +181,58 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) -> repl(Db, Acc0) -> erlang:put(io_priority, {internal_repl, couch_db:name(Db)}), - #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}), - case Seq >= couch_db:get_update_seq(Db) of - true -> - {ok, 0}; - false -> - Fun = fun ?MODULE:changes_enumerator/2, - {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), - {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), - {ok, couch_db:count_changes_since(Db, LastSeq)} + #acc{source = Db2} = Acc1 = pull_purges_from_target(Db, Acc0), + #acc{seq=Seq} = Acc2 = calculate_start_seq(Acc1), + try + % this throws an exception: {invalid_start_purge_seq, PurgeSeq0} + Acc3 = replicate_purged_docs(Acc2), + Fun = fun ?MODULE:changes_enumerator/2, + {ok, Acc4} = couch_db:fold_changes(Db2, Seq, Fun, Acc3), + {ok, #acc{seq = LastSeq}} = replicate_batch(Acc4), + {ok, couch_db:count_changes_since(Db2, LastSeq)} + catch + throw:{invalid_start_purge_seq, PurgeSeq} -> + couch_log:error( + "Oldest_purge_seq on source is greated than " + "the last source's purge_seq: ~p known to target!" + "Can't synchronize purges between: ~p and ~p!", + [PurgeSeq, Acc2#acc.source, Acc2#acc.target] + ) end. +pull_purges_from_target(Db, #acc{target=#shard{node=TNode, name=DbName}}=Acc) -> + SourceUUID = couch_db:get_uuid(Db), + {TUUIDsIdsRevs, TargetPDocID, TargetPSeq} = + mem3_rpc:load_purges(TNode, DbName, SourceUUID), + Acc2 = case TUUIDsIdsRevs of + [] -> Acc#acc{source = Db}; + _ -> + % check which Target UUIDs have not been applied to Source + UUIDs = [UUID || {UUID, _Id, _Revs} <- TUUIDsIdsRevs], + PurgedDocs = couch_db:open_purged_docs(Db, UUIDs), + Results = lists:zip(TUUIDsIdsRevs, PurgedDocs), + Unapplied = lists:filtermap(fun + ({UUIDIdRevs, not_found}) -> {true, UUIDIdRevs}; + (_) -> false + end, Results), + Acc1 = case Unapplied of + [] -> Acc#acc{source = Db}; + _ -> + % purge Db on Source and reopen it + couch_db:purge_docs(Db, Unapplied), + couch_db:close(Db), + {ok, Db2} = couch_db:open(DbName, [?ADMIN_CTX]), + Acc#acc{source = Db2} + end, + % update on Target target_purge_seq known to Source + mem3_rpc:save_purge_checkpoint(TNode, DbName, TargetPDocID, + TargetPSeq, node()), + Acc1 + end, + Acc2. + + calculate_start_seq(Acc) -> #acc{ source = Db, @@ -215,7 +264,33 @@ calculate_start_seq(Acc) -> Seq = TargetSeq, History = couch_util:get_value(<<"history">>, TProps, {[]}) end, - Acc1#acc{seq = Seq, history = History}; + SourcePurgeSeq0 = couch_util:get_value(<<"purge_seq">>, SProps), + TargetPurgeSeq0 = couch_util:get_value(<<"purge_seq">>, TProps), + % before purge upgrade, purge_seq was not saved in checkpoint file, + % thus get purge_seq directly from dbs + SourcePurgeSeq = case is_integer(SourcePurgeSeq0) of + true -> + SourcePurgeSeq0; + false -> + {ok, SPS} = couch_db:get_purge_seq(Db), + SPS + end, + TargetPurgeSeq = case is_integer(TargetPurgeSeq0) of + true -> + TargetPurgeSeq0; + false -> + mem3_rpc:get_purge_seq(Node, Name, [ + {io_priority, {internal_repl, Name}}, + ?ADMIN_CTX + ]) + end, + case SourcePurgeSeq =< TargetPurgeSeq of + true -> + PurgeSeq = SourcePurgeSeq; + false -> + PurgeSeq = TargetPurgeSeq + end, + Acc1#acc{seq = Seq, history = History, purge_seq = PurgeSeq}; {not_found, _} -> compare_epochs(Acc1) end. @@ -251,6 +326,27 @@ changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) -> {Go, Acc1}. +replicate_purged_docs(Acc0) -> + #acc{ + source = Db, + target = #shard{node=Node, name=Name}, + purge_seq = PurgeSeq0 + } = Acc0, + PFoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) -> + [{UUID, Id, Revs} | Acc] + end, + + {ok, UUIDsIdsRevs} = couch_db:fold_purged_docs(Db, PurgeSeq0, PFoldFun, [], []), + case UUIDsIdsRevs of + [] -> + Acc0; + _ -> + ok = purge_on_target(Node, Name, UUIDsIdsRevs), + {ok, PurgeSeq} = couch_db:get_purge_seq(Db), + Acc0#acc{purge_seq = PurgeSeq} + end. + + replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> case find_missing_revs(Acc) of [] -> @@ -324,8 +420,19 @@ save_on_target(Node, Name, Docs) -> ok. +purge_on_target(Node, Name, UUIdsIdsRevs) -> + mem3_rpc:purge_docs(Node, Name, UUIdsIdsRevs,[ + replicated_changes, + full_commit, + ?ADMIN_CTX, + {io_priority, {internal_repl, Name}} + ]), + ok. + + update_locals(Acc) -> - #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc, + #acc{seq=Seq, source=Db, target=Target, localid=Id, + history=History, purge_seq = PurgeSeq} = Acc, #shard{name=Name, node=Node} = Target, NewEntry = [ {<<"source_node">>, atom_to_binary(node(), utf8)}, @@ -333,7 +440,8 @@ update_locals(Acc) -> {<<"source_seq">>, Seq}, {<<"timestamp">>, list_to_binary(iso8601_timestamp())} ], - NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History), + NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, PurgeSeq, + NewEntry, History), {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). @@ -369,6 +477,12 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) -> end. +% used during compaction to check if _local/purge doc is current +mem3_sync_purge(Opts)-> + Node = couch_util:get_value(<<"node">>, Opts), + lists:member(Node, mem3:nodes()). + + is_prefix(Prefix, Subject) -> binary:longest_common_prefix([Prefix, Subject]) == size(Prefix). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index c2bd58fdf..b9c1b390b 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -19,15 +19,22 @@ find_common_seq/4, get_missing_revs/4, update_docs/4, + get_purge_seq/3, + purge_docs/4, load_checkpoint/4, - save_checkpoint/6 + save_checkpoint/7, + load_purges/3, + save_purge_checkpoint/5 ]). % Private RPC callbacks -export([ find_common_seq_rpc/3, load_checkpoint_rpc/3, - save_checkpoint_rpc/5 + save_checkpoint_rpc/5, + save_checkpoint_rpc/6, + load_purges_rpc/2, + save_purge_checkpoint_rpc/4 ]). @@ -43,16 +50,34 @@ update_docs(Node, DbName, Docs, Options) -> rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}). +get_purge_seq(Node, DbName, Options) -> + rexi_call(Node, {fabric_rpc, get_purge_seq, [DbName, Options]}). + + +purge_docs(Node, DbName, PUUIdsIdsRevs, Options) -> + rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PUUIdsIdsRevs, Options]}). + + load_checkpoint(Node, DbName, SourceNode, SourceUUID) -> Args = [DbName, SourceNode, SourceUUID], rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}). -save_checkpoint(Node, DbName, DocId, Seq, Entry, History) -> - Args = [DbName, DocId, Seq, Entry, History], +save_checkpoint(Node, DbName, DocId, Seq, PurgeSeq, Entry, History) -> + Args = [DbName, DocId, Seq, PurgeSeq, Entry, History], rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}). +load_purges(Node, DbName, SourceUUID) -> + Args = [DbName, SourceUUID], + rexi_call(Node, {mem3_rpc, load_purges_rpc, Args}). + + +save_purge_checkpoint(Node, DbName, DocId, PurgeSeq, SourceNode) -> + Args = [DbName, DocId, PurgeSeq, SourceNode], + rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}). + + find_common_seq(Node, DbName, SourceUUID, SourceEpochs) -> Args = [DbName, SourceUUID, SourceEpochs], rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}). @@ -81,6 +106,7 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> end. +% Remove after all nodes in the cluster are upgrades to clustered purge save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of @@ -111,6 +137,40 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> rexi:reply(Error) end. + +save_checkpoint_rpc(DbName, Id, SourceSeq, SourcePurgeSeq, + NewEntry0, History0) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + NewEntry = {[ + {<<"target_node">>, atom_to_binary(node(), utf8)}, + {<<"target_uuid">>, couch_db:get_uuid(Db)}, + {<<"target_seq">>, couch_db:get_update_seq(Db)} + ] ++ NewEntry0}, + Body = {[ + {<<"seq">>, SourceSeq}, + {<<"purge_seq">>, SourcePurgeSeq}, + {<<"target_uuid">>, couch_db:get_uuid(Db)}, + {<<"history">>, add_checkpoint(NewEntry, History0)} + ]}, + Doc = #doc{id = Id, body = Body}, + rexi:reply(try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + {ok, Body}; + Else -> + {error, Else} + catch + Exception -> + Exception; + error:Reason -> + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + + find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of @@ -128,6 +188,66 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> end. +load_purges_rpc(DbName, SourceUUID) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + TargetUUID = couch_db:get_uuid(Db), + DocId = mem3_rep:make_local_purge_id(SourceUUID, TargetUUID), + LastPSeq = case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{body={Props}} } -> + couch_util:get_value(<<"purge_seq">>, Props); + {not_found, _} -> + % synchronize only last purge + {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db), + erlang:max(OldestPSeq-1, 0) + end, + {ok, CurPSeq} = couch_db:get_purge_seq(Db), + UUIDsIdsRevs = if (LastPSeq == CurPSeq) -> []; true -> + FoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) -> + [{UUID, Id, Revs} | Acc] + end, + {ok, UUIDsIdsRevs0} = couch_db:fold_purged_docs( + Db, LastPSeq, FoldFun, [], [] + ), + UUIDsIdsRevs0 + end, + rexi:reply({ok, {UUIDsIdsRevs, DocId, CurPSeq}}); + Error -> + rexi:reply(Error) + end. + + +save_purge_checkpoint_rpc(DbName, Id, PurgeSeq, Node) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + Timestamp = couch_util:utc_string(), + Body = {[ + {<<"purge_seq">>, PurgeSeq}, + {<<"timestamp_utc">>, Timestamp}, + {<<"verify_module">>, <<"mem3_rep">>}, + {<<"verify_function">>, <<"mem3_sync_purge">>}, + {<<"verify_options">>, {[{<<"node">>, Node}]}}, + {<<"type">>, <<"internal_replication">>} + ]}, + Doc = #doc{id = Id, body = Body}, + rexi:reply(try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + {ok, Body}; + Else -> + {error, Else} + catch + Exception -> + Exception; + error:Reason -> + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + + %% @doc Return the sequence where two files with the same UUID diverged. compare_epochs(SourceEpochs, TargetEpochs) -> compare_rev_epochs( |