From 17b5be544a89d60a21d16555d3c17db7c573b66b Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Thu, 18 Oct 2018 13:54:27 +0100 Subject: Enforce upper bound on partition size --- src/chttpd/src/chttpd.erl | 3 ++ src/couch/src/couch_db.erl | 5 ++ src/couch/src/couch_db_updater.erl | 102 +++++++++++++++++++++++++++++++------ 3 files changed, 95 insertions(+), 15 deletions(-) diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index a5628396b..0d50b264b 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -870,6 +870,9 @@ error_info(conflict) -> {409, <<"conflict">>, <<"Document update conflict.">>}; error_info({conflict, _}) -> {409, <<"conflict">>, <<"Document update conflict.">>}; +error_info({partition_overflow, Partition}) -> + {403, <<"partition_overflow">>, + <<"partition '", Partition/binary, "' exceeds limit">>}; error_info({{not_found, missing}, {_, _}}) -> {409, <<"not_found">>, <<"missing_rev">>}; error_info({forbidden, Error, Msg}) -> diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 5a3e47dfc..8f2809da8 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -47,6 +47,7 @@ get_revs_limit/1, get_security/1, get_props/1, + is_partitioned/1, get_update_seq/1, get_user_ctx/1, get_uuid/1, @@ -740,6 +741,10 @@ set_prop(#db{main_pid=Pid}=Db, Key, Value) -> {ok, _} = ensure_full_commit(Db), ok. +is_partitioned(#db{} = Db) -> + Props = get_props(Db), + proplists:get_value(partitioned, Props) == true. + set_user_ctx(#db{} = Db, UserCtx) -> {ok, Db#db{user_ctx = UserCtx}}. diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 687fb7991..17fb22c25 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -18,6 +18,7 @@ -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include_lib("couch/include/couch_db.hrl"). +-include_lib("stdlib/include/assert.hrl"). -include("couch_db_int.hrl"). -define(IDLE_LIMIT_DEFAULT, 61000). @@ -456,13 +457,13 @@ doc_tag(#doc{meta=Meta}) -> Else -> throw({invalid_doc_tag, Else}) end. -merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; +merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq, PP) -> + {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq, PP}; merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> + [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq, PP) -> erlang:put(last_id_merged, OldDocInfo#full_doc_info.id), % for debugging NewDocInfo0 = lists:foldl(fun({Client, NewDoc}, OldInfoAcc) -> - merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts) + merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts, PP) end, OldDocInfo, NewDocs), NewDocInfo1 = maybe_stem_full_doc_info(NewDocInfo0, Limit), % When MergeConflicts is false, we updated #full_doc_info.deleted on every @@ -482,7 +483,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], if NewDocInfo2 == OldDocInfo -> % nothing changed merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, - AccNewInfos, AccRemoveSeqs, AccSeq); + AccNewInfos, AccRemoveSeqs, AccSeq, PP); true -> % We have updated the document, give it a new update_seq. Its % important to note that the update_seq on OldDocInfo should @@ -496,10 +497,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList], _ -> [OldSeq | AccRemoveSeqs] end, merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo, - [NewDocInfo3|AccNewInfos], RemoveSeqs, AccSeq+1) + [NewDocInfo3|AccNewInfos], RemoveSeqs, AccSeq+1, PP) end. -merge_rev_tree(OldInfo, NewDoc, Client, false) +merge_rev_tree(OldInfo, NewDoc, Client, false, PP) when OldInfo#full_doc_info.deleted -> % We're recreating a document that was previously % deleted. To check that this is a recreation from @@ -537,10 +538,21 @@ merge_rev_tree(OldInfo, NewDoc, Client, false) {NewTree1, new_leaf} -> % We changed the revision id so inform the caller send_result(Client, NewDoc, {ok, {OldPos+1, NewRevId}}), - OldInfo#full_doc_info{ + NewInfo = OldInfo#full_doc_info{ rev_tree = NewTree1, deleted = false - }; + }, + case check_overflow(NewInfo, OldInfo, PP) of + true -> + Partition = partition(OldInfo#full_doc_info.id), + send_result(Client, NewDoc, + {partition_overflow, Partition}), + OldInfo; + false -> + % We changed the revision id so inform the caller + send_result(Client, NewDoc, {ok, {OldPos+1, NewRevId}}), + NewInfo + end; _ -> throw(doc_recreation_failed) end; @@ -548,7 +560,7 @@ merge_rev_tree(OldInfo, NewDoc, Client, false) send_result(Client, NewDoc, conflict), OldInfo end; -merge_rev_tree(OldInfo, NewDoc, Client, false) -> +merge_rev_tree(OldInfo, NewDoc, Client, false, PP) -> % We're attempting to merge a new revision into an % undeleted document. To not be a conflict we require % that the merge results in extending a branch. @@ -558,10 +570,19 @@ merge_rev_tree(OldInfo, NewDoc, Client, false) -> NewDeleted = NewDoc#doc.deleted, case couch_key_tree:merge(OldTree, NewTree0) of {NewTree, new_leaf} when not NewDeleted -> - OldInfo#full_doc_info{ + NewInfo = OldInfo#full_doc_info{ rev_tree = NewTree, deleted = false - }; + }, + case check_overflow(NewInfo, OldInfo, PP) of + true -> + Partition = partition(OldInfo#full_doc_info.id), + send_result(Client, NewDoc, + {partition_overflow, Partition}), + OldInfo; + false -> + NewInfo + end; {NewTree, new_leaf} when NewDeleted -> % We have to check if we just deleted this % document completely or if it was a conflict @@ -574,7 +595,7 @@ merge_rev_tree(OldInfo, NewDoc, Client, false) -> send_result(Client, NewDoc, conflict), OldInfo end; -merge_rev_tree(OldInfo, NewDoc, _Client, true) -> +merge_rev_tree(OldInfo, NewDoc, _Client, true, _PP) -> % We're merging in revisions without caring about % conflicts. Most likely this is a replication update. OldTree = OldInfo#full_doc_info.rev_tree, @@ -582,6 +603,12 @@ merge_rev_tree(OldInfo, NewDoc, _Client, true) -> {NewTree, _} = couch_key_tree:merge(OldTree, NewTree0), OldInfo#full_doc_info{rev_tree = NewTree}. +check_overflow(#full_doc_info{} = _New, #full_doc_info{} = _Old, []) -> + false; +check_overflow(#full_doc_info{} = New, #full_doc_info{} = Old, PP) -> + Partition = partition(New#full_doc_info.id), + lists:member(Partition, PP) andalso (estimate_size(New) >= estimate_size(Old)). + maybe_stem_full_doc_info(#full_doc_info{rev_tree = Tree} = Info, Limit) -> case config:get_boolean("couchdb", "stem_interactive_updates", true) of true -> @@ -604,9 +631,20 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> (Id, not_found) -> #full_doc_info{id=Id} end, Ids, OldDocLookups), + + %% get the list of partitions that are at or above the size limit. + ProhibitedPartitions = case couch_db:is_partitioned(Db) of + true -> + Max = config:get_integer("couchdb", "max_partition_size", 10000000000), + Partitions = lists:usort([partition(Id) || Id <- Ids]), + [P || P <- Partitions, partition_size(Db, P) >= Max]; + false -> + [] + end, + % Merge the new docs into the revision trees. - {ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit, - MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq), + {ok, NewFullDocInfos, RemSeqs, _, _} = merge_rev_trees(RevsLimit, + MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq, ProhibitedPartitions), % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) @@ -634,6 +672,40 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> {ok, commit_data(Db1, not FullCommit), UpdatedDDocIds}. +partition_size(#db{} = Db, Partition) -> + {ok, Info} = couch_db:get_partition_info(Db, Partition), + Sizes = couch_util:get_value(sizes, Info), + couch_util:get_value(external, Sizes). + +estimate_size(#full_doc_info{} = FDI) -> + #full_doc_info{rev_tree = RevTree} = FDI, + Fun = fun + (_Rev, Value, leaf, SizesAcc) -> + case Value of + #doc{} = Doc -> + ExternalSize = get_meta_body_size(Value#doc.meta), + {size_info, AttSizeInfo} = + lists:keyfind(size_info, 1, Doc#doc.meta), + Leaf = #leaf{ + sizes = #size_info{ + external = ExternalSize + }, + atts = AttSizeInfo + }, + add_sizes(leaf, Leaf, SizesAcc); + #leaf{} -> + add_sizes(leaf, Value, SizesAcc) + end; + (_Rev, _Value, branch, SizesAcc) -> + SizesAcc + end, + {_FinalAS, FinalES, FinalAtts} = couch_key_tree:fold(Fun, {0, 0, []}, RevTree), + TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), + FinalES + TotalAttSize. + +partition(Id) -> + [Partition | _] = binary:split(Id, <<":">>), + Partition. update_local_doc_revs(Docs) -> lists:map(fun({Client, NewDoc}) -> -- cgit v1.2.1