summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-24 12:24:10 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-26 13:58:35 -0500
commitd16071de595270b401100a2c395148896c9459b7 (patch)
tree1798cb218889e97ae48e86621d15449432aa2fd3
parent78d7e1f64fa03113ee2bac34f75d58afd895575c (diff)
downloadcouchdb-d16071de595270b401100a2c395148896c9459b7.tar.gz
Implement new node local purge APIs
Rewrite purge logic to use the new couch_db_engine purge APIs. This work will allow for the new purge behaviors to enable clustered purge.
-rw-r--r--src/couch/priv/stats_descriptions.cfg12
-rw-r--r--src/couch/src/couch_db.erl153
-rw-r--r--src/couch/src/couch_db_updater.erl164
-rw-r--r--src/couch/src/couch_httpd_db.erl23
4 files changed, 258 insertions, 94 deletions
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index f0919782e..bceb0cea8 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -34,6 +34,10 @@
{type, counter},
{desc, <<"number of times a document was read from a database">>}
]}.
+{[couchdb, database_purges], [
+ {type, counter},
+ {desc, <<"number of times a database was purged">>}
+]}.
{[couchdb, db_open_time], [
{type, histogram},
{desc, <<"milliseconds required to open a database">>}
@@ -46,6 +50,10 @@
{type, counter},
{desc, <<"number of document write operations">>}
]}.
+{[couchdb, document_purges], [
+ {type, counter},
+ {desc, <<"number of document purge operations">>}
+]}.
{[couchdb, local_document_writes], [
{type, counter},
{desc, <<"number of _local document write operations">>}
@@ -74,6 +82,10 @@
{type, counter},
{desc, <<"number of clients for continuous _changes">>}
]}.
+{[couchdb, httpd, purge_requests], [
+ {type, counter},
+ {desc, <<"number of purge requests">>}
+]}.
{[couchdb, httpd_request_methods, 'COPY'], [
{type, counter},
{desc, <<"number of HTTP COPY requests">>}
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 3449274f6..8592193d4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -43,7 +43,6 @@
get_epochs/1,
get_filepath/1,
get_instance_start_time/1,
- get_last_purged/1,
get_pid/1,
get_revs_limit/1,
get_security/1,
@@ -51,12 +50,15 @@
get_user_ctx/1,
get_uuid/1,
get_purge_seq/1,
+ get_oldest_purge_seq/1,
+ get_purge_infos_limit/1,
is_db/1,
is_system_db/1,
is_clustered/1,
set_revs_limit/2,
+ set_purge_infos_limit/2,
set_security/2,
set_user_ctx/2,
@@ -75,6 +77,9 @@
get_full_doc_infos/2,
get_missing_revs/2,
get_design_docs/1,
+ get_purge_infos/2,
+
+ get_minimum_purge_seq/1,
update_doc/3,
update_doc/4,
@@ -84,6 +89,7 @@
delete_doc/3,
purge_docs/2,
+ purge_docs/3,
with_stream/3,
open_write_stream/2,
@@ -97,6 +103,8 @@
fold_changes/4,
fold_changes/5,
count_changes_since/2,
+ fold_purge_infos/4,
+ fold_purge_infos/5,
calculate_start_seq/3,
owner_of/2,
@@ -369,8 +377,132 @@ get_full_doc_info(Db, Id) ->
get_full_doc_infos(Db, Ids) ->
couch_db_engine:open_docs(Db, Ids).
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
- gen_server:call(Pid, {purge_docs, IdsRevs}).
+purge_docs(Db, IdRevs) ->
+ purge_docs(Db, IdRevs, []).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) ->
+ {ok, [Reply]} when
+ UUId :: binary(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()},
+ PurgeOption :: interactive_edit | replicated_changes,
+ Reply :: {ok, []} | {ok, [Rev]}.
+purge_docs(#db{main_pid = Pid} = Db, UUIdsIdsRevs, Options) ->
+ increment_stat(Db, [couchdb, database_purges]),
+ gen_server:call(Pid, {purge_docs, UUIdsIdsRevs, Options}).
+
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+ UUId :: binary(),
+ PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
+ PurgeSeq :: non_neg_integer(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()}.
+get_purge_infos(Db, UUIDs) ->
+ couch_db_engine:load_purge_infos(Db, UUIDs).
+
+
+get_minimum_purge_seq(#db{} = Db) ->
+ PurgeSeq = couch_db_engine:get_purge_seq(Db),
+ OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db),
+ PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db),
+
+ FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+ case DocId of
+ <<"_local/purge-", _/binary>> ->
+ ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ case ClientSeq of
+ CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+ {ok, SeqAcc};
+ CS when is_integer(CS) ->
+ case purge_client_exists(Db, DocId, Props) of
+ true -> {ok, erlang:min(CS, SeqAcc)};
+ false -> {ok, SeqAcc}
+ end;
+ _ ->
+ % If there's a broken doc we have to keep every
+ % purge info until the doc is fixed or removed.
+ Fmt = "Invalid purge doc '~s' with purge_seq '~w'",
+ couch_log:error(Fmt, [DocId, ClientSeq]),
+ {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+ end;
+ _ ->
+ {stop, SeqAcc}
+ end
+ end,
+ InitMinSeq = PurgeSeq - PurgeInfosLimit,
+ Opts = [
+ {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
+ ],
+ {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+ FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+ true -> MinIdxSeq;
+ false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+ end,
+ % Log a warning if we've got a purge sequence exceeding the
+ % configured threshold.
+ if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true ->
+ Fmt = "The purge sequence for '~s' exceeds configured threshold",
+ couch_log:warning(Fmt, [couch_db:name(Db)])
+ end,
+ FinalSeq.
+
+
+purge_client_exists(DbName, DocId, Props) ->
+ % Warn about clients that have not updated their purge
+ % checkpoints in the last "index_lag_warn_seconds"
+ LagWindow = config:get_integer(
+ "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ LagThreshold = NowSecs - LagWindow,
+
+ try
+ CheckFun = get_purge_client_fun(DocId, Props),
+ Exists = CheckFun(DbName, DocId, Props),
+ if not Exists -> ok; true ->
+ Updated = couch_util:get_value(<<"updated_on">>, Props),
+ if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+ Diff = NowSecs - Updated,
+ Fmt = "Purge checkpint '~s' not updated in ~p seconds",
+ couch_log:error(Fmt, [DocId, Diff])
+ end
+ end,
+ Exists
+ catch _:_ ->
+ % If we fail to check for a client we have to assume that
+ % it exists.
+ true
+ end.
+
+
+get_purge_client_fun(DocId, Props) ->
+ M0 = couch_util:get_value(<<"verify_module">>, Props),
+ M = try
+ binary_to_existing_atom(M0, latin1)
+ catch error:badarg ->
+ Fmt1 = "Missing index module '~s' for purge checkpoint '~s'",
+ couch_log:error(Fmt1, [M0, DocId]),
+ throw(failed)
+ end,
+
+ F0 = couch_util:get_value(<<"verify_function">>, Props),
+ try
+ F = binary_to_existing_atom(F0, latin1),
+ fun M:F/2
+ catch error:badarg ->
+ Fmt2 = "Missing function '~s' in '~s' for purge checkpoint '~s'",
+ couch_log:error(Fmt2, [F0, M0, DocId]),
+ throw(failed)
+ end.
+
+
+set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
+set_purge_infos_limit(_Db, _Limit) ->
+ throw(invalid_purge_infos_limit).
+
get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
Fun.
@@ -392,8 +524,11 @@ get_user_ctx(?OLD_DB_REC = Db) ->
get_purge_seq(#db{}=Db) ->
{ok, couch_db_engine:get_purge_seq(Db)}.
-get_last_purged(#db{}=Db) ->
- {ok, couch_db_engine:get_last_purged(Db)}.
+get_oldest_purge_seq(#db{}=Db) ->
+ {ok, couch_db_engine:get_oldest_purge_seq(Db)}.
+
+get_purge_infos_limit(#db{}=Db) ->
+ couch_db_engine:get_purge_infos_limit(Db).
get_pid(#db{main_pid = Pid}) ->
Pid.
@@ -1406,6 +1541,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
+fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
+ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+ couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
count_changes_since(Db, SinceSeq) ->
couch_db_engine:count_changes_since(Db, SinceSeq).
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 6a30d6549..315f737be 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -94,79 +94,41 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
{reply, ok, Db3, idle_limit()};
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- DocIds = [Id || {Id, _Revs} <- IdRevs],
- OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
- NewDocInfos = lists:flatmap(fun
- ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, [] = _RemovedRevs} -> % no change
- [];
- {NewTree, RemovedRevs} ->
- NewFDI = FDI#full_doc_info{rev_tree = NewTree},
- [{FDI, NewFDI, RemovedRevs}]
- end;
- ({_, not_found}) ->
- []
- end, lists:zip(IdRevs, OldDocInfos)),
-
- InitUpdateSeq = couch_db_engine:get_update_seq(Db),
- InitAcc = {InitUpdateSeq, [], []},
- FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
- #full_doc_info{
- id = Id,
- rev_tree = OldTree
- } = OldFDI,
- {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
- {NewFDIAcc, NewSeqAcc} = case OldTree of
- [] ->
- % If we purged every #leaf{} in the doc record
- % then we're removing it completely from the
- % database.
- {FDIAcc, SeqAcc0};
- _ ->
- % Its possible to purge the #leaf{} that contains
- % the update_seq where this doc sits in the update_seq
- % sequence. Rather than do a bunch of complicated checks
- % we just re-label every #leaf{} and reinsert it into
- % the update_seq sequence.
- {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
- (_RevId, Leaf, leaf, InnerSeqAcc) ->
- {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
- (_RevId, Value, _Type, InnerSeqAcc) ->
- {Value, InnerSeqAcc}
- end, SeqAcc0, OldTree),
-
- NewFDI = OldFDI#full_doc_info{
- update_seq = SeqAcc1,
- rev_tree = NewTree
- },
-
- {[NewFDI | FDIAcc], SeqAcc1}
- end,
- NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
- {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
- end, InitAcc, NewDocInfos),
-
- {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {reply, ok, Db2};
+
+handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
+ % Filter out any previously applied updates during
+ % internal replication
+ IsRepl = lists:member(replicated_changes, Options),
+ PurgeReqs = if not IsRepl -> PurgeReqs0; true ->
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+ {ok, PurgeInfos} = couch_db:load_purge_infos(Db, UUIDs),
+ lists:flatmap(fun
+ ({not_found, PReq}) -> [PReq];
+ ({{_, _, _, _}, _}) -> []
+ end, lists:zip(PurgeInfos, PurgeReqs0))
+ end,
- % We need to only use the list of #full_doc_info{} records
- % that we have actually changed due to a purge.
- PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
- Pairs = pair_purge_info(PreviousFDIs, FDIs),
+ Ids = lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs),
+ DocInfos = couch_db_engine:open_docs(Db, Ids),
+ UpdateSeq = couch_db_engine:get_update_seq(Db),
+ PurgeSeq = couch_db_engine:get_purge_seq(Db),
- {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
- Db3 = commit_data(Db2),
- ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
- couch_event:notify(Db#db.name, updated),
+ InitAcc = {[], [], []},
+ {Pairs, PInfos, Replies} = purge_docs(
+ PurgeReqs, DocInfos, UpdateSeq, PurgeSeq, InitAcc),
- PurgeSeq = couch_db_engine:get_purge_seq(Db3),
- {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()};
+ Db3 = if Pairs == [] -> Db; true ->
+ {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+ Db2 = commit_data(Db1),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ couch_event:notify(Db2#db.name, updated),
+ Db2
+ end,
+ {reply, {ok, Replies}, Db3, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -646,7 +608,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
LocalDocs2 = update_local_doc_revs(LocalDocs),
- {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
@@ -692,6 +654,57 @@ update_local_doc_revs(Docs) ->
end, Docs).
+purge_docs([], [], _USeq, _PSeq, {Pairs, PInfos, Replies}) ->
+ {lists:reverse(Pairs), lists:reverse(PInfos), lists:reverse(Replies)};
+
+purge_docs([Req | RestReqs], [FDI | RestInfos], USeq, PSeq, Acc) ->
+ {UUID, DocId, Revs} = Req,
+ {Pair, RemovedRevs, NewUSeq} = case FDI of
+ #full_doc_info{rev_tree = Tree} ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []} ->
+ % No change
+ {no_change, [], USeq};
+ {[], Removed} ->
+ % Completely purged
+ {{FDI, not_found}, Removed, USeq};
+ {NewTree, Removed} ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the
+ % update_seq sequence. Rather than do a bunch of
+ % complicated checks we just re-label every #leaf{}
+ % and reinsert it into the update_seq sequence.
+ {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, SeqAcc) ->
+ {Leaf#leaf{seq = SeqAcc + 1},
+ SeqAcc + 1};
+ (_RevId, Value, _Type, SeqAcc) ->
+ {Value, SeqAcc}
+ end, USeq, NewTree),
+
+ NewFDI = FDI#full_doc_info{
+ update_seq = NewUpdateSeq,
+ rev_tree = NewTree2
+ },
+ {{FDI, NewFDI}, Removed, NewUpdateSeq}
+ end;
+ not_found ->
+ % Not found means nothing to change
+ {no_change, [], USeq}
+ end,
+ {Pairs, PInfos, Replies} = Acc,
+ NewPairs = case Pair of
+ no_change -> Pairs;
+ _ -> [Pair | Pairs]
+ end,
+ NewAcc = {
+ NewPairs,
+ [{PSeq + 1, UUID, DocId, Revs} | PInfos],
+ [{ok, RemovedRevs} | Replies]
+ },
+ purge_docs(RestReqs, RestInfos, NewUSeq, PSeq + 1, NewAcc).
+
+
commit_data(Db) ->
commit_data(Db, false).
@@ -721,15 +734,6 @@ pair_write_info(Old, New) ->
end, New).
-pair_purge_info(Old, New) ->
- lists:map(fun(OldFDI) ->
- case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
- #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
- false -> {OldFDI, not_found}
- end
- end, Old).
-
-
get_meta_body_size(Meta) ->
{ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
ExternalSize.
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 99b1192a9..27f7d7046 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
+ couch_stats:increment_counter([couchdb, httpd, purge_requests]),
couch_httpd:validate_ctype(Req, "application/json"),
- {IdsRevs} = couch_httpd:json_body_obj(Req),
- IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
+ {IdRevs} = couch_httpd:json_body_obj(Req),
+ PurgeReqs = lists:map(fun({Id, JsonRevs}) ->
+ {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)}
+ end, IdRevs),
- case couch_db:purge_docs(Db, IdsRevs2) of
- {ok, PurgeSeq, PurgedIdsRevs} ->
- PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
- send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
- Error ->
- throw(Error)
- end;
+ {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs),
+
+ Results = lists:zipwith(fun({{Id, _}, Reply}) ->
+ {Id, couch_doc:revs_to_strs(Reply)}
+ end, IdRevs, Replies),
+
+ {ok, Db2} = couch_db:reopen(Db),
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db2),
+ send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]});
db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");