summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMayya Sharipova <mayyas@ca.ibm.com>2017-05-01 14:36:39 -0400
committerPaul J. Davis <paul.joseph.davis@gmail.com>2018-03-13 15:49:51 -0500
commitd89a79a78f5c4ef293a94339a5848afdd58c1bc1 (patch)
tree1fec382b10f159b7a89e3d5ac14b180ad6da7df9
parent3c8e5927950ae95dd3c6ed40916c3e6bcb0361ef (diff)
downloadcouchdb-d89a79a78f5c4ef293a94339a5848afdd58c1bc1.tar.gz
Add internal replication of purge requests
* Add initial pull replication of purge requests - Each internal replication job starts by pulling purge requests from target and applying them on source. If a source and target were disconnected during a purge request, it's possible that the target has received a purge request not yet present on the source. Given that internal replication is push oriented it would be possible for the source and target to reconnect and have the source push a revision that has since been purged. To avoid this we should pull purge requests from the target to ensure we're up to date before beginning internal replication. - Add _local/purge-mem3-$hash docs in mem3_rep. mem3 writes a _local/purge-mem3-$hash document once purge requests have been replicated. This document will exist on the target and the purge_seq value will be the target's purge_seq that has been processed during *pull* replication. * Add push replication of purge requests - Push new purge requests from source to target, and apply them on target - Update checkpoint docs to store the purge_seq on source COUCHDB-3326
-rw-r--r--src/mem3/src/mem3_rep.erl142
-rw-r--r--src/mem3/src/mem3_rpc.erl128
2 files changed, 252 insertions, 18 deletions
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 942f8a8e0..44aee49cd 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,7 +17,9 @@
go/2,
go/3,
make_local_id/2,
- find_source_seq/4
+ make_local_purge_id/2,
+ find_source_seq/4,
+ mem3_sync_purge/1
]).
-export([
@@ -39,7 +41,8 @@
target,
filter,
db,
- history = {[]}
+ history = {[]},
+ purge_seq = 0
}).
@@ -119,6 +122,12 @@ make_local_id(SourceThing, TargetThing, Filter) ->
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+make_local_purge_id(SourceUUID, TargetUUID) ->
+ V = ?l2b("v" ++ config:get("purge", "version", "1") ++ "-"),
+ <<"_local/purge-", V/binary, "mem3-",
+ SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
%% @doc Find and return the largest update_seq in SourceDb
%% that the client has seen from TargetNode.
%%
@@ -172,18 +181,58 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
repl(Db, Acc0) ->
erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
- #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
- case Seq >= couch_db:get_update_seq(Db) of
- true ->
- {ok, 0};
- false ->
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
- {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
- {ok, couch_db:count_changes_since(Db, LastSeq)}
+ #acc{source = Db2} = Acc1 = pull_purges_from_target(Db, Acc0),
+ #acc{seq=Seq} = Acc2 = calculate_start_seq(Acc1),
+ try
+ % this throws an exception: {invalid_start_purge_seq, PurgeSeq0}
+ Acc3 = replicate_purged_docs(Acc2),
+ Fun = fun ?MODULE:changes_enumerator/2,
+ {ok, Acc4} = couch_db:fold_changes(Db2, Seq, Fun, Acc3),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(Acc4),
+ {ok, couch_db:count_changes_since(Db2, LastSeq)}
+ catch
+ throw:{invalid_start_purge_seq, PurgeSeq} ->
+ couch_log:error(
+ "Oldest_purge_seq on source is greated than "
+ "the last source's purge_seq: ~p known to target!"
+ "Can't synchronize purges between: ~p and ~p!",
+ [PurgeSeq, Acc2#acc.source, Acc2#acc.target]
+ )
end.
+pull_purges_from_target(Db, #acc{target=#shard{node=TNode, name=DbName}}=Acc) ->
+ SourceUUID = couch_db:get_uuid(Db),
+ {TUUIDsIdsRevs, TargetPDocID, TargetPSeq} =
+ mem3_rpc:load_purges(TNode, DbName, SourceUUID),
+ Acc2 = case TUUIDsIdsRevs of
+ [] -> Acc#acc{source = Db};
+ _ ->
+ % check which Target UUIDs have not been applied to Source
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- TUUIDsIdsRevs],
+ PurgedDocs = couch_db:open_purged_docs(Db, UUIDs),
+ Results = lists:zip(TUUIDsIdsRevs, PurgedDocs),
+ Unapplied = lists:filtermap(fun
+ ({UUIDIdRevs, not_found}) -> {true, UUIDIdRevs};
+ (_) -> false
+ end, Results),
+ Acc1 = case Unapplied of
+ [] -> Acc#acc{source = Db};
+ _ ->
+ % purge Db on Source and reopen it
+ couch_db:purge_docs(Db, Unapplied),
+ couch_db:close(Db),
+ {ok, Db2} = couch_db:open(DbName, [?ADMIN_CTX]),
+ Acc#acc{source = Db2}
+ end,
+ % update on Target target_purge_seq known to Source
+ mem3_rpc:save_purge_checkpoint(TNode, DbName, TargetPDocID,
+ TargetPSeq, node()),
+ Acc1
+ end,
+ Acc2.
+
+
calculate_start_seq(Acc) ->
#acc{
source = Db,
@@ -215,7 +264,33 @@ calculate_start_seq(Acc) ->
Seq = TargetSeq,
History = couch_util:get_value(<<"history">>, TProps, {[]})
end,
- Acc1#acc{seq = Seq, history = History};
+ SourcePurgeSeq0 = couch_util:get_value(<<"purge_seq">>, SProps),
+ TargetPurgeSeq0 = couch_util:get_value(<<"purge_seq">>, TProps),
+ % before purge upgrade, purge_seq was not saved in checkpoint file,
+ % thus get purge_seq directly from dbs
+ SourcePurgeSeq = case is_integer(SourcePurgeSeq0) of
+ true ->
+ SourcePurgeSeq0;
+ false ->
+ {ok, SPS} = couch_db:get_purge_seq(Db),
+ SPS
+ end,
+ TargetPurgeSeq = case is_integer(TargetPurgeSeq0) of
+ true ->
+ TargetPurgeSeq0;
+ false ->
+ mem3_rpc:get_purge_seq(Node, Name, [
+ {io_priority, {internal_repl, Name}},
+ ?ADMIN_CTX
+ ])
+ end,
+ case SourcePurgeSeq =< TargetPurgeSeq of
+ true ->
+ PurgeSeq = SourcePurgeSeq;
+ false ->
+ PurgeSeq = TargetPurgeSeq
+ end,
+ Acc1#acc{seq = Seq, history = History, purge_seq = PurgeSeq};
{not_found, _} ->
compare_epochs(Acc1)
end.
@@ -251,6 +326,27 @@ changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
{Go, Acc1}.
+replicate_purged_docs(Acc0) ->
+ #acc{
+ source = Db,
+ target = #shard{node=Node, name=Name},
+ purge_seq = PurgeSeq0
+ } = Acc0,
+ PFoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+ [{UUID, Id, Revs} | Acc]
+ end,
+
+ {ok, UUIDsIdsRevs} = couch_db:fold_purged_docs(Db, PurgeSeq0, PFoldFun, [], []),
+ case UUIDsIdsRevs of
+ [] ->
+ Acc0;
+ _ ->
+ ok = purge_on_target(Node, Name, UUIDsIdsRevs),
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+ Acc0#acc{purge_seq = PurgeSeq}
+ end.
+
+
replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
@@ -324,8 +420,19 @@ save_on_target(Node, Name, Docs) ->
ok.
+purge_on_target(Node, Name, UUIdsIdsRevs) ->
+ mem3_rpc:purge_docs(Node, Name, UUIdsIdsRevs,[
+ replicated_changes,
+ full_commit,
+ ?ADMIN_CTX,
+ {io_priority, {internal_repl, Name}}
+ ]),
+ ok.
+
+
update_locals(Acc) ->
- #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
+ #acc{seq=Seq, source=Db, target=Target, localid=Id,
+ history=History, purge_seq = PurgeSeq} = Acc,
#shard{name=Name, node=Node} = Target,
NewEntry = [
{<<"source_node">>, atom_to_binary(node(), utf8)},
@@ -333,7 +440,8 @@ update_locals(Acc) ->
{<<"source_seq">>, Seq},
{<<"timestamp">>, list_to_binary(iso8601_timestamp())}
],
- NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
+ NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, PurgeSeq,
+ NewEntry, History),
{ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
@@ -369,6 +477,12 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
end.
+% used during compaction to check if _local/purge doc is current
+mem3_sync_purge(Opts)->
+ Node = couch_util:get_value(<<"node">>, Opts),
+ lists:member(Node, mem3:nodes()).
+
+
is_prefix(Prefix, Subject) ->
binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index c2bd58fdf..b9c1b390b 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -19,15 +19,22 @@
find_common_seq/4,
get_missing_revs/4,
update_docs/4,
+ get_purge_seq/3,
+ purge_docs/4,
load_checkpoint/4,
- save_checkpoint/6
+ save_checkpoint/7,
+ load_purges/3,
+ save_purge_checkpoint/5
]).
% Private RPC callbacks
-export([
find_common_seq_rpc/3,
load_checkpoint_rpc/3,
- save_checkpoint_rpc/5
+ save_checkpoint_rpc/5,
+ save_checkpoint_rpc/6,
+ load_purges_rpc/2,
+ save_purge_checkpoint_rpc/4
]).
@@ -43,16 +50,34 @@ update_docs(Node, DbName, Docs, Options) ->
rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+get_purge_seq(Node, DbName, Options) ->
+ rexi_call(Node, {fabric_rpc, get_purge_seq, [DbName, Options]}).
+
+
+purge_docs(Node, DbName, PUUIdsIdsRevs, Options) ->
+ rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PUUIdsIdsRevs, Options]}).
+
+
load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
Args = [DbName, SourceNode, SourceUUID],
rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
- Args = [DbName, DocId, Seq, Entry, History],
+save_checkpoint(Node, DbName, DocId, Seq, PurgeSeq, Entry, History) ->
+ Args = [DbName, DocId, Seq, PurgeSeq, Entry, History],
rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
+load_purges(Node, DbName, SourceUUID) ->
+ Args = [DbName, SourceUUID],
+ rexi_call(Node, {mem3_rpc, load_purges_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, DocId, PurgeSeq, SourceNode) ->
+ Args = [DbName, DocId, PurgeSeq, SourceNode],
+ rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
+
+
find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
Args = [DbName, SourceUUID, SourceEpochs],
rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
@@ -81,6 +106,7 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
end.
+% Remove after all nodes in the cluster are upgrades to clustered purge
save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -111,6 +137,40 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
rexi:reply(Error)
end.
+
+save_checkpoint_rpc(DbName, Id, SourceSeq, SourcePurgeSeq,
+ NewEntry0, History0) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ NewEntry = {[
+ {<<"target_node">>, atom_to_binary(node(), utf8)},
+ {<<"target_uuid">>, couch_db:get_uuid(Db)},
+ {<<"target_seq">>, couch_db:get_update_seq(Db)}
+ ] ++ NewEntry0},
+ Body = {[
+ {<<"seq">>, SourceSeq},
+ {<<"purge_seq">>, SourcePurgeSeq},
+ {<<"target_uuid">>, couch_db:get_uuid(Db)},
+ {<<"history">>, add_checkpoint(NewEntry, History0)}
+ ]},
+ Doc = #doc{id = Id, body = Body},
+ rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ {ok, Body};
+ Else ->
+ {error, Else}
+ catch
+ Exception ->
+ Exception;
+ error:Reason ->
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
@@ -128,6 +188,66 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
end.
+load_purges_rpc(DbName, SourceUUID) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ TargetUUID = couch_db:get_uuid(Db),
+ DocId = mem3_rep:make_local_purge_id(SourceUUID, TargetUUID),
+ LastPSeq = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{body={Props}} } ->
+ couch_util:get_value(<<"purge_seq">>, Props);
+ {not_found, _} ->
+ % synchronize only last purge
+ {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db),
+ erlang:max(OldestPSeq-1, 0)
+ end,
+ {ok, CurPSeq} = couch_db:get_purge_seq(Db),
+ UUIDsIdsRevs = if (LastPSeq == CurPSeq) -> []; true ->
+ FoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+ [{UUID, Id, Revs} | Acc]
+ end,
+ {ok, UUIDsIdsRevs0} = couch_db:fold_purged_docs(
+ Db, LastPSeq, FoldFun, [], []
+ ),
+ UUIDsIdsRevs0
+ end,
+ rexi:reply({ok, {UUIDsIdsRevs, DocId, CurPSeq}});
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
+save_purge_checkpoint_rpc(DbName, Id, PurgeSeq, Node) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Timestamp = couch_util:utc_string(),
+ Body = {[
+ {<<"purge_seq">>, PurgeSeq},
+ {<<"timestamp_utc">>, Timestamp},
+ {<<"verify_module">>, <<"mem3_rep">>},
+ {<<"verify_function">>, <<"mem3_sync_purge">>},
+ {<<"verify_options">>, {[{<<"node">>, Node}]}},
+ {<<"type">>, <<"internal_replication">>}
+ ]},
+ Doc = #doc{id = Id, body = Body},
+ rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ {ok, Body};
+ Else ->
+ {error, Else}
+ catch
+ Exception ->
+ Exception;
+ error:Reason ->
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
%% @doc Return the sequence where two files with the same UUID diverged.
compare_epochs(SourceEpochs, TargetEpochs) ->
compare_rev_epochs(