summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-24 12:24:10 -0500
committerjiangph <jiangph@cn.ibm.com>2018-08-22 00:59:15 +0800
commiteca282e3fa4285dfeb2ff1591f66330e8d18c868 (patch)
tree16a15f1ea011e87969377ad539008b6e7bce5d75
parentfebb0fc8b5abe2b8b3f62c8806ab799919ab897e (diff)
downloadcouchdb-eca282e3fa4285dfeb2ff1591f66330e8d18c868.tar.gz
[02/10] Clustered Purge: Update single node APIs
This patch updates the single node API implementations for use with the new clustered purge API. At the single node level the major change is to store a history of purge requests that can then be consumed by various other parts of the database system. The simpler of the major areas to use this new functionality will be any secondary indices. Rather than checking that only a single purge request has occurred each secondary index will store a _local document referencing its oldest processed purge request. During index updates each secondary index implementation will process any new purge requests and update its local doc checkpoint. In this way secondary indexes will no longer be sensitive to reset when multiple purge requests are issued against the database. The two other major areas that will make use of the newly stored purge request history are both of the anit-entropy mechanisms: read-repair and internal replication. Read-repair will use the purge request history to know when a node should discard updates that have come from a node that has not yet processed a purge request during internal replication. Otherwise read-repair would effectively undo any purge replication that happened "recently". Internal replication will use the purge request history to be able to mend any differences between shards. For instance, if a shard is down when a purge request is issue against a cluster this process will pull the purge request and apply it during internal replication. And similarly any local purge requests will be applied on the target before normal internal replication. COUCHDB-3326 Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com> Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r--src/couch/priv/stats_descriptions.cfg12
-rw-r--r--src/couch/src/couch_db.erl157
-rw-r--r--src/couch/src/couch_db_plugin.erl6
-rw-r--r--src/couch/src/couch_db_updater.erl185
-rw-r--r--src/couch/src/couch_httpd_db.erl23
5 files changed, 284 insertions, 99 deletions
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index f0919782e..bceb0cea8 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -34,6 +34,10 @@
{type, counter},
{desc, <<"number of times a document was read from a database">>}
]}.
+{[couchdb, database_purges], [
+ {type, counter},
+ {desc, <<"number of times a database was purged">>}
+]}.
{[couchdb, db_open_time], [
{type, histogram},
{desc, <<"milliseconds required to open a database">>}
@@ -46,6 +50,10 @@
{type, counter},
{desc, <<"number of document write operations">>}
]}.
+{[couchdb, document_purges], [
+ {type, counter},
+ {desc, <<"number of document purge operations">>}
+]}.
{[couchdb, local_document_writes], [
{type, counter},
{desc, <<"number of _local document write operations">>}
@@ -74,6 +82,10 @@
{type, counter},
{desc, <<"number of clients for continuous _changes">>}
]}.
+{[couchdb, httpd, purge_requests], [
+ {type, counter},
+ {desc, <<"number of purge requests">>}
+]}.
{[couchdb, httpd_request_methods, 'COPY'], [
{type, counter},
{desc, <<"number of HTTP COPY requests">>}
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 40c673a8b..8e932b2ed 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -43,7 +43,6 @@
get_epochs/1,
get_filepath/1,
get_instance_start_time/1,
- get_last_purged/1,
get_pid/1,
get_revs_limit/1,
get_security/1,
@@ -51,12 +50,15 @@
get_user_ctx/1,
get_uuid/1,
get_purge_seq/1,
+ get_oldest_purge_seq/1,
+ get_purge_infos_limit/1,
is_db/1,
is_system_db/1,
is_clustered/1,
set_revs_limit/2,
+ set_purge_infos_limit/2,
set_security/2,
set_user_ctx/2,
@@ -75,6 +77,10 @@
get_full_doc_infos/2,
get_missing_revs/2,
get_design_docs/1,
+ get_purge_infos/2,
+
+ get_minimum_purge_seq/1,
+ purge_client_exists/3,
update_doc/3,
update_doc/4,
@@ -84,6 +90,7 @@
delete_doc/3,
purge_docs/2,
+ purge_docs/3,
with_stream/3,
open_write_stream/2,
@@ -97,6 +104,8 @@
fold_changes/4,
fold_changes/5,
count_changes_since/2,
+ fold_purge_infos/4,
+ fold_purge_infos/5,
calculate_start_seq/3,
owner_of/2,
@@ -369,8 +378,129 @@ get_full_doc_info(Db, Id) ->
get_full_doc_infos(Db, Ids) ->
couch_db_engine:open_docs(Db, Ids).
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
- gen_server:call(Pid, {purge_docs, IdsRevs}).
+purge_docs(Db, IdRevs) ->
+ purge_docs(Db, IdRevs, []).
+
+-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) ->
+ {ok, [Reply]} when
+ UUId :: binary(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()},
+ PurgeOption :: interactive_edit | replicated_changes,
+ Reply :: {ok, []} | {ok, [Rev]}.
+purge_docs(#db{main_pid = Pid} = Db, UUIDsIdsRevs, Options) ->
+ % Check here if any UUIDs already exist when
+ % we're not replicating purge infos
+ IsRepl = lists:member(replicated_changes, Options),
+ if IsRepl -> ok; true ->
+ UUIDs = [UUID || {UUID, _, _} <- UUIDsIdsRevs],
+ lists:foreach(fun(Resp) ->
+ if Resp == not_found -> ok; true ->
+ Fmt = "Duplicate purge info UIUD: ~s",
+ Reason = io_lib:format(Fmt, [element(2, Resp)]),
+ throw({badreq, Reason})
+ end
+ end, get_purge_infos(Db, UUIDs))
+ end,
+ increment_stat(Db, [couchdb, database_purges]),
+ gen_server:call(Pid, {purge_docs, UUIDsIdsRevs, Options}).
+
+-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
+ UUId :: binary(),
+ PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
+ PurgeSeq :: non_neg_integer(),
+ Id :: binary(),
+ Rev :: {non_neg_integer(), binary()}.
+get_purge_infos(Db, UUIDs) ->
+ couch_db_engine:load_purge_infos(Db, UUIDs).
+
+
+get_minimum_purge_seq(#db{} = Db) ->
+ PurgeSeq = couch_db_engine:get_purge_seq(Db),
+ OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db),
+ PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db),
+
+ FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
+ case DocId of
+ <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
+ ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ case ClientSeq of
+ CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
+ {ok, SeqAcc};
+ CS when is_integer(CS) ->
+ case purge_client_exists(Db, DocId, Props) of
+ true -> {ok, erlang:min(CS, SeqAcc)};
+ false -> {ok, SeqAcc}
+ end;
+ _ ->
+ % If there's a broken doc we have to keep every
+ % purge info until the doc is fixed or removed.
+ Fmt = "Invalid purge doc '~s' on database ~p
+ with purge_seq '~w'",
+ DbName = couch_db:name(Db),
+ couch_log:error(Fmt, [DocId, DbName, ClientSeq]),
+ {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
+ end;
+ _ ->
+ {stop, SeqAcc}
+ end
+ end,
+ InitMinSeq = PurgeSeq - PurgeInfosLimit,
+ Opts = [
+ {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
+ ],
+ {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
+ FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
+ true -> MinIdxSeq;
+ false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
+ end,
+ % Log a warning if we've got a purge sequence exceeding the
+ % configured threshold.
+ if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true ->
+ Fmt = "The purge sequence for '~s' exceeds configured threshold",
+ couch_log:warning(Fmt, [couch_db:name(Db)])
+ end,
+ FinalSeq.
+
+
+purge_client_exists(DbName, DocId, Props) ->
+ % Warn about clients that have not updated their purge
+ % checkpoints in the last "index_lag_warn_seconds"
+ LagWindow = config:get_integer(
+ "purge", "index_lag_warn_seconds", 86400), % Default 24 hours
+
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ LagThreshold = NowSecs - LagWindow,
+
+ try
+ Exists = couch_db_plugin:is_valid_purge_client(DbName, Props),
+ if not Exists -> ok; true ->
+ Updated = couch_util:get_value(<<"updated_on">>, Props),
+ if is_integer(Updated) and Updated > LagThreshold -> ok; true ->
+ Diff = NowSecs - Updated,
+ Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds
+ in database ~p",
+ couch_log:error(Fmt1, [DocId, Diff, DbName])
+ end
+ end,
+ Exists
+ catch _:_ ->
+ % If we fail to check for a client we have to assume that
+ % it exists.
+ Fmt2 = "Failed to check purge checkpoint using
+ document '~p' in database ~p",
+ couch_log:error(Fmt2, [DbName, DocId]),
+ true
+ end.
+
+
+set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
+set_purge_infos_limit(_Db, _Limit) ->
+ throw(invalid_purge_infos_limit).
+
get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
Fun.
@@ -390,10 +520,13 @@ get_user_ctx(?OLD_DB_REC = Db) ->
?OLD_DB_USER_CTX(Db).
get_purge_seq(#db{}=Db) ->
- {ok, couch_db_engine:get_purge_seq(Db)}.
+ couch_db_engine:get_purge_seq(Db).
+
+get_oldest_purge_seq(#db{}=Db) ->
+ couch_db_engine:get_oldest_purge_seq(Db).
-get_last_purged(#db{}=Db) ->
- {ok, couch_db_engine:get_last_purged(Db)}.
+get_purge_infos_limit(#db{}=Db) ->
+ couch_db_engine:get_purge_infos_limit(Db).
get_pid(#db{main_pid = Pid}) ->
Pid.
@@ -471,7 +604,8 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
+get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) ->
+ DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))),
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
@@ -481,7 +615,6 @@ get_design_docs(#db{} = Db) ->
{ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
{ok, lists:reverse(Docs)}.
-
check_is_admin(#db{user_ctx=UserCtx}=Db) ->
case is_admin(Db) of
true -> ok;
@@ -1400,6 +1533,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
+fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
+ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
+
+
+fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
+ couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+
+
count_changes_since(Db, SinceSeq) ->
couch_db_engine:count_changes_since(Db, SinceSeq).
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 816325699..e25866ec4 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -18,6 +18,7 @@
after_doc_read/2,
validate_docid/1,
check_is_admin/1,
+ is_valid_purge_client/2,
on_compact/2,
on_delete/2
]).
@@ -57,6 +58,11 @@ check_is_admin(Db) ->
%% callbacks return true only if it specifically allow the given Id
couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []).
+is_valid_purge_client(DbName, Props) ->
+ Handle = couch_epi:get_handle(?SERVICE_ID),
+ %% callbacks return true only if it specifically allow the given Id
+ couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [DbName, Props], []).
+
on_compact(DbName, DDocs) ->
Handle = couch_epi:get_handle(?SERVICE_ID),
couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []).
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 40e836a4c..52a4d2f1b 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -94,79 +94,28 @@ handle_call({set_revs_limit, Limit}, _From, Db) ->
ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
{reply, ok, Db3, idle_limit()};
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db, idle_limit()};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- DocIds = [Id || {Id, _Revs} <- IdRevs],
- OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
-
- NewDocInfos = lists:flatmap(fun
- ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, [] = _RemovedRevs} -> % no change
- [];
- {NewTree, RemovedRevs} ->
- NewFDI = FDI#full_doc_info{rev_tree = NewTree},
- [{FDI, NewFDI, RemovedRevs}]
- end;
- ({_, not_found}) ->
- []
- end, lists:zip(IdRevs, OldDocInfos)),
-
- InitUpdateSeq = couch_db_engine:get_update_seq(Db),
- InitAcc = {InitUpdateSeq, [], []},
- FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
- #full_doc_info{
- id = Id,
- rev_tree = OldTree
- } = OldFDI,
- {SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
-
- {NewFDIAcc, NewSeqAcc} = case OldTree of
- [] ->
- % If we purged every #leaf{} in the doc record
- % then we're removing it completely from the
- % database.
- {FDIAcc, SeqAcc0};
- _ ->
- % Its possible to purge the #leaf{} that contains
- % the update_seq where this doc sits in the update_seq
- % sequence. Rather than do a bunch of complicated checks
- % we just re-label every #leaf{} and reinsert it into
- % the update_seq sequence.
- {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
- (_RevId, Leaf, leaf, InnerSeqAcc) ->
- {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
- (_RevId, Value, _Type, InnerSeqAcc) ->
- {Value, InnerSeqAcc}
- end, SeqAcc0, OldTree),
-
- NewFDI = OldFDI#full_doc_info{
- update_seq = SeqAcc1,
- rev_tree = NewTree
- },
-
- {[NewFDI | FDIAcc], SeqAcc1}
- end,
- NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
- {NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
- end, InitAcc, NewDocInfos),
-
- {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
-
- % We need to only use the list of #full_doc_info{} records
- % that we have actually changed due to a purge.
- PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
- Pairs = pair_purge_info(PreviousFDIs, FDIs),
-
- {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
- Db3 = commit_data(Db2),
- ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
- couch_event:notify(Db#db.name, updated),
+handle_call({set_purge_infos_limit, Limit}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {reply, ok, Db2, idle_limit()};
- PurgeSeq = couch_db_engine:get_purge_seq(Db3),
- {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()};
+handle_call({purge_docs, [], _}, _From, Db) ->
+ {reply, {ok, []}, Db, idle_limit()};
+
+handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
+ % Filter out any previously applied updates during
+ % internal replication
+ IsRepl = lists:member(replicated_changes, Options),
+ PurgeReqs = if not IsRepl -> PurgeReqs0; true ->
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0],
+ PurgeInfos = couch_db_engine:load_purge_infos(Db, UUIDs),
+ lists:flatmap(fun
+ ({not_found, PReq}) -> [PReq];
+ ({{_, _, _, _}, _}) -> []
+ end, lists:zip(PurgeInfos, PurgeReqs0))
+ end,
+ {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
+ {reply, {ok, Replies}, NewDb, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
@@ -656,7 +605,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
LocalDocs2 = update_local_doc_revs(LocalDocs),
- {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
+ {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
@@ -702,6 +651,87 @@ update_local_doc_revs(Docs) ->
end, Docs).
+purge_docs(Db, []) ->
+ {ok, Db, []};
+
+purge_docs(Db, PurgeReqs) ->
+ Ids = lists:usort(lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs)),
+ FDIs = couch_db_engine:open_docs(Db, Ids),
+ USeq = couch_db_engine:get_update_seq(Db),
+
+ IdFDIs = lists:zip(Ids, FDIs),
+ {NewIdFDIs, Replies} = apply_purge_reqs(PurgeReqs, IdFDIs, USeq, []),
+
+ Pairs = lists:flatmap(fun({DocId, OldFDI}) ->
+ {DocId, NewFDI} = lists:keyfind(DocId, 1, NewIdFDIs),
+ case {OldFDI, NewFDI} of
+ {not_found, not_found} ->
+ [];
+ {#full_doc_info{} = A, #full_doc_info{} = A} ->
+ [];
+ {#full_doc_info{}, _} ->
+ [{OldFDI, NewFDI}]
+ end
+ end, IdFDIs),
+
+ PSeq = couch_db_engine:get_purge_seq(Db),
+ {RevPInfos, _} = lists:foldl(fun({UUID, DocId, Revs}, {PIAcc, PSeqAcc}) ->
+ Info = {PSeqAcc + 1, UUID, DocId, Revs},
+ {[Info | PIAcc], PSeqAcc + 1}
+ end, {[], PSeq}, PurgeReqs),
+ PInfos = lists:reverse(RevPInfos),
+
+ {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos),
+ Db2 = commit_data(Db1),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ couch_event:notify(Db2#db.name, updated),
+ {ok, Db2, Replies}.
+
+
+apply_purge_reqs([], IdFDIs, _USeq, Replies) ->
+ {IdFDIs, lists:reverse(Replies)};
+
+apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) ->
+ {_UUID, DocId, Revs} = Req,
+ {value, {_, FDI0}, RestIdFDIs} = lists:keytake(DocId, 1, IdFDIs),
+ {NewFDI, RemovedRevs, NewUSeq} = case FDI0 of
+ #full_doc_info{rev_tree = Tree} ->
+ case couch_key_tree:remove_leafs(Tree, Revs) of
+ {_, []} ->
+ % No change
+ {FDI0, [], USeq};
+ {[], Removed} ->
+ % Completely purged
+ {not_found, Removed, USeq};
+ {NewTree, Removed} ->
+ % Its possible to purge the #leaf{} that contains
+ % the update_seq where this doc sits in the
+ % update_seq sequence. Rather than do a bunch of
+ % complicated checks we just re-label every #leaf{}
+ % and reinsert it into the update_seq sequence.
+ {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun
+ (_RevId, Leaf, leaf, SeqAcc) ->
+ {Leaf#leaf{seq = SeqAcc + 1},
+ SeqAcc + 1};
+ (_RevId, Value, _Type, SeqAcc) ->
+ {Value, SeqAcc}
+ end, USeq, NewTree),
+
+ FDI1 = FDI0#full_doc_info{
+ update_seq = NewUpdateSeq,
+ rev_tree = NewTree2
+ },
+ {FDI1, Removed, NewUpdateSeq}
+ end;
+ not_found ->
+ % Not found means nothing to change
+ {not_found, [], USeq}
+ end,
+ NewIdFDIs = [{DocId, NewFDI} | RestIdFDIs],
+ NewReplies = [{ok, RemovedRevs} | Replies],
+ apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies).
+
+
commit_data(Db) ->
commit_data(Db, false).
@@ -731,15 +761,6 @@ pair_write_info(Old, New) ->
end, New).
-pair_purge_info(Old, New) ->
- lists:map(fun(OldFDI) ->
- case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of
- #full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
- false -> {OldFDI, not_found}
- end
- end, Old).
-
-
get_meta_body_size(Meta) ->
{ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
ExternalSize.
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 99b1192a9..81209d9ff 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
+ couch_stats:increment_counter([couchdb, httpd, purge_requests]),
couch_httpd:validate_ctype(Req, "application/json"),
- {IdsRevs} = couch_httpd:json_body_obj(Req),
- IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
+ {IdRevs} = couch_httpd:json_body_obj(Req),
+ PurgeReqs = lists:map(fun({Id, JsonRevs}) ->
+ {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)}
+ end, IdRevs),
- case couch_db:purge_docs(Db, IdsRevs2) of
- {ok, PurgeSeq, PurgedIdsRevs} ->
- PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs],
- send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]});
- Error ->
- throw(Error)
- end;
+ {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs),
+
+ Results = lists:zipwith(fun({Id, _}, {ok, Reply}) ->
+ {Id, couch_doc:revs_to_strs(Reply)}
+ end, IdRevs, Replies),
+
+ {ok, Db2} = couch_db:reopen(Db),
+ PurgeSeq = couch_db:get_purge_seq(Db2),
+ send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]});
db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");