From d708bd6776fcce2cb243a05253fce477b037d5cb Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Fri, 14 Dec 2018 11:06:03 -0600 Subject: Enforce partition size limits This limit helps prevent users from inadvertently misusing partitions by refusing to add documents when the size of a partition exceeds 10GiB. Co-authored-by: Robert Newson --- rel/overlay/etc/default.ini | 5 + src/chttpd/src/chttpd.erl | 5 + src/couch/src/couch_db_updater.erl | 92 +++++++- test/elixir/test/partition_size_limit_test.exs | 305 +++++++++++++++++++++++++ 4 files changed, 403 insertions(+), 4 deletions(-) create mode 100644 test/elixir/test/partition_size_limit_test.exs diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index a77add4bd..ae9d3133e 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -64,6 +64,11 @@ default_engine = couch ; move deleted databases/shards there instead. You can then manually delete ; these files later, as desired. ;enable_database_recovery = false +; +; Set the maximum size allowed for a partition. This helps users avoid +; inadvertently abusing partitions resulting in hot shards. The default +; is 10GiB. A value of 0 or less will disable partition size checks. +;max_partition_size = 10737418240 [couchdb_engines] ; The keys in this section are the filename extension that diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index 2f241cdad..0b3349a24 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -873,6 +873,11 @@ error_info(conflict) -> {409, <<"conflict">>, <<"Document update conflict.">>}; error_info({conflict, _}) -> {409, <<"conflict">>, <<"Document update conflict.">>}; +error_info({partition_overflow, DocId}) -> + Descr = << + "Partition limit exceeded due to update on '", DocId/binary, "'" + >>, + {403, <<"partition_overflow">>, Descr}; error_info({{not_found, missing}, {_, _}}) -> {409, <<"not_found">>, <<"missing_rev">>}; error_info({forbidden, Error, Msg}) -> diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 95508e248..4227ff036 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -21,6 +21,7 @@ -include("couch_db_int.hrl"). -define(IDLE_LIMIT_DEFAULT, 61000). +-define(DEFAULT_MAX_PARTITION_SIZE, 16#280000000). % 10 GiB -record(merge_acc, { @@ -28,7 +29,8 @@ merge_conflicts, add_infos = [], rem_seqs = [], - cur_seq + cur_seq, + full_partitions = [] }). @@ -466,13 +468,22 @@ merge_rev_trees([], [], Acc) -> merge_rev_trees([NewDocs | RestDocsList], [OldDocInfo | RestOldInfo], Acc) -> #merge_acc{ revs_limit = Limit, - merge_conflicts = MergeConflicts + merge_conflicts = MergeConflicts, + full_partitions = FullPartitions } = Acc, % Track doc ids so we can debug large revision trees erlang:put(last_id_merged, OldDocInfo#full_doc_info.id), NewDocInfo0 = lists:foldl(fun({Client, NewDoc}, OldInfoAcc) -> - merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts) + NewInfo = merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts), + case is_overflowed(NewInfo, OldInfoAcc, FullPartitions) of + true when not MergeConflicts -> + DocId = NewInfo#full_doc_info.id, + send_result(Client, NewDoc, {partition_overflow, DocId}), + OldInfoAcc; + _ -> + NewInfo + end end, OldDocInfo, NewDocs), NewDocInfo1 = maybe_stem_full_doc_info(NewDocInfo0, Limit), % When MergeConflicts is false, we updated #full_doc_info.deleted on every @@ -595,6 +606,24 @@ merge_rev_tree(OldInfo, NewDoc, _Client, true) -> {NewTree, _} = couch_key_tree:merge(OldTree, NewTree0), OldInfo#full_doc_info{rev_tree = NewTree}. +is_overflowed(_New, _Old, []) -> + false; +is_overflowed(Old, Old, _FullPartitions) -> + false; +is_overflowed(New, Old, FullPartitions) -> + case New#full_doc_info.id of + <<"_design/", _/binary>> -> + false; + DDocId -> + Partition = couch_partition:from_docid(DDocId), + case lists:member(Partition, FullPartitions) of + true -> + estimate_size(New) > estimate_size(Old); + false -> + false + end + end. + maybe_stem_full_doc_info(#full_doc_info{rev_tree = Tree} = Info, Limit) -> case config:get_boolean("couchdb", "stem_interactive_updates", true) of true -> @@ -617,13 +646,34 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> (Id, not_found) -> #full_doc_info{id=Id} end, Ids, OldDocLookups), + + %% Get the list of full partitions + FullPartitions = case couch_db:is_partitioned(Db) of + true -> + case max_partition_size() of + N when N =< 0 -> + []; + Max -> + Partitions = lists:usort(lists:flatmap(fun(Id) -> + case couch_partition:extract(Id) of + undefined -> []; + {Partition, _} -> [Partition] + end + end, Ids)), + [P || P <- Partitions, partition_size(Db, P) >= Max] + end; + false -> + [] + end, + % Merge the new docs into the revision trees. AccIn = #merge_acc{ revs_limit = RevsLimit, merge_conflicts = MergeConflicts, add_infos = [], rem_seqs = [], - cur_seq = UpdateSeq + cur_seq = UpdateSeq, + full_partitions = FullPartitions }, {ok, AccOut} = merge_rev_trees(DocsList, OldDocInfos, AccIn), #merge_acc{ @@ -685,6 +735,40 @@ increment_local_doc_revs(#doc{revs = {0, [RevStr | _]}} = Doc) -> increment_local_doc_revs(#doc{}) -> {error, <<"Invalid rev format">>}. +max_partition_size() -> + config:get_integer("couchdb", "max_partition_size", + ?DEFAULT_MAX_PARTITION_SIZE). + +partition_size(Db, Partition) -> + {ok, Info} = couch_db:get_partition_info(Db, Partition), + Sizes = couch_util:get_value(sizes, Info), + couch_util:get_value(external, Sizes). + +estimate_size(#full_doc_info{} = FDI) -> + #full_doc_info{rev_tree = RevTree} = FDI, + Fun = fun + (_Rev, Value, leaf, SizesAcc) -> + case Value of + #doc{} = Doc -> + ExternalSize = get_meta_body_size(Value#doc.meta), + {size_info, AttSizeInfo} = + lists:keyfind(size_info, 1, Doc#doc.meta), + Leaf = #leaf{ + sizes = #size_info{ + external = ExternalSize + }, + atts = AttSizeInfo + }, + add_sizes(leaf, Leaf, SizesAcc); + #leaf{} -> + add_sizes(leaf, Value, SizesAcc) + end; + (_Rev, _Value, branch, SizesAcc) -> + SizesAcc + end, + {_, FinalES, FinalAtts} = couch_key_tree:fold(Fun, {0, 0, []}, RevTree), + TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), + FinalES + TotalAttSize. purge_docs(Db, []) -> {ok, Db, []}; diff --git a/test/elixir/test/partition_size_limit_test.exs b/test/elixir/test/partition_size_limit_test.exs new file mode 100644 index 000000000..b4be6480e --- /dev/null +++ b/test/elixir/test/partition_size_limit_test.exs @@ -0,0 +1,305 @@ +defmodule PartitionSizeLimitTest do + use CouchTestCase + + @moduledoc """ + Test Partition size limit functionality + """ + + @max_size 10_240 + + setup do + db_name = random_db_name() + {:ok, _} = create_db(db_name, query: %{partitioned: true, q: 1}) + on_exit(fn -> delete_db(db_name) end) + + set_config({"couchdb", "max_partition_size", Integer.to_string(@max_size)}) + + {:ok, [db_name: db_name]} + end + + defp get_db_info(dbname) do + resp = Couch.get("/#{dbname}") + assert resp.status_code == 200 + %{:body => body} = resp + body + end + + defp get_partition_info(dbname, partition) do + resp = Couch.get("/#{dbname}/_partition/#{partition}") + assert resp.status_code == 200 + %{:body => body} = resp + body + end + + defp open_doc(db_name, docid, status_assert \\ 200) do + resp = Couch.get("/#{db_name}/#{docid}") + assert resp.status_code == status_assert + %{:body => body} = resp + body + end + + defp save_doc(db_name, doc, status_assert \\ 201) do + resp = Couch.post("/#{db_name}", query: [w: 3], body: doc) + assert resp.status_code == status_assert + %{:body => body} = resp + body["rev"] + end + + defp delete_doc(db_name, doc, status_assert \\ 200) do + url = "/#{db_name}/#{doc["_id"]}" + rev = doc["_rev"] + resp = Couch.delete(url, query: [w: 3, rev: rev]) + assert resp.status_code == status_assert + %{:body => body} = resp + body["rev"] + end + + defp fill_partition(db_name, partition \\ "foo") do + docs = + 1..15 + |> Enum.map(fn i -> + id = i |> Integer.to_string() |> String.pad_leading(4, "0") + docid = "#{partition}:#{id}" + %{_id: docid, value: "0" |> String.pad_leading(1024)} + end) + + body = %{:w => 3, :docs => docs} + resp = Couch.post("/#{db_name}/_bulk_docs", body: body) + assert resp.status_code == 201 + end + + defp compact(db) do + assert Couch.post("/#{db}/_compact").status_code == 202 + + retry_until( + fn -> + Couch.get("/#{db}").body["compact_running"] == false + end, + 200, + 20_000 + ) + end + + test "fill partition manually", context do + db_name = context[:db_name] + partition = "foo" + + resp = + 1..1000 + |> Enum.find_value(0, fn i -> + id = i |> Integer.to_string() |> String.pad_leading(4, "0") + docid = "#{partition}:#{id}" + doc = %{_id: docid, value: "0" |> String.pad_leading(1024)} + resp = Couch.post("/#{db_name}", query: [w: 3], body: doc) + + if resp.status_code == 201 do + false + else + resp + end + end) + + assert resp.status_code == 403 + %{body: body} = resp + assert body["error"] == "partition_overflow" + + info = get_partition_info(db_name, partition) + assert info["sizes"]["external"] >= @max_size + end + + test "full partitions reject POST /dbname", context do + db_name = context[:db_name] + fill_partition(db_name) + + doc = %{_id: "foo:bar", value: "stuff"} + resp = Couch.post("/#{db_name}", query: [w: 3], body: doc) + assert resp.status_code == 403 + %{body: body} = resp + assert body["error"] == "partition_overflow" + end + + test "full partitions reject PUT /dbname/docid", context do + db_name = context[:db_name] + fill_partition(db_name) + + doc = %{value: "stuff"} + resp = Couch.put("/#{db_name}/foo:bar", query: [w: 3], body: doc) + assert resp.status_code == 403 + %{body: body} = resp + assert body["error"] == "partition_overflow" + end + + test "full partitions reject POST /dbname/_bulk_docs", context do + db_name = context[:db_name] + fill_partition(db_name) + + body = %{w: 3, docs: [%{_id: "foo:bar"}]} + resp = Couch.post("/#{db_name}/_bulk_docs", query: [w: 3], body: body) + assert resp.status_code == 201 + %{body: body} = resp + doc_resp = Enum.at(body, 0) + assert doc_resp["error"] == "partition_overflow" + end + + test "full partitions with mixed POST /dbname/_bulk_docs", context do + db_name = context[:db_name] + fill_partition(db_name) + + body = %{w: 3, docs: [%{_id: "foo:bar"}, %{_id: "baz:bang"}]} + resp = Couch.post("/#{db_name}/_bulk_docs", query: [w: 3], body: body) + assert resp.status_code == 201 + %{body: body} = resp + + doc_resp1 = Enum.at(body, 0) + assert doc_resp1["error"] == "partition_overflow" + + doc_resp2 = Enum.at(body, 1) + assert doc_resp2["ok"] + end + + test "full partitions are still readable", context do + db_name = context[:db_name] + fill_partition(db_name) + open_doc(db_name, "foo:0001") + end + + test "full partitions can accept deletes", context do + db_name = context[:db_name] + fill_partition(db_name) + + doc = open_doc(db_name, "foo:0001") + delete_doc(db_name, doc) + end + + test "full partitions can accept updates that reduce size", context do + db_name = context[:db_name] + fill_partition(db_name) + + doc = open_doc(db_name, "foo:0001") + save_doc(db_name, %{doc | "value" => ""}) + end + + test "full partition does not affect other partitions", context do + db_name = context[:db_name] + fill_partition(db_name) + save_doc(db_name, %{_id: "bar:foo", value: "stuff"}) + end + + test "full partition does not affect design documents", context do + db_name = context[:db_name] + fill_partition(db_name) + rev1 = save_doc(db_name, %{_id: "_design/foo", value: "stuff"}) + save_doc(db_name, %{_id: "_design/foo", _rev: rev1, value: "hi"}) + doc = open_doc(db_name, "_design/foo") + delete_doc(db_name, doc) + end + + test "replication into a full partition works", context do + db_name = context[:db_name] + fill_partition(db_name) + save_doc(db_name, %{_id: "foo:bar", value: "stuff"}, 403) + + doc = %{ + _id: "foo:bar", + _rev: <<"1-23202479633c2b380f79507a776743d5">>, + value: "stuff" + } + + url = "/#{db_name}/#{doc[:_id]}" + query = [new_edits: false, w: 3] + resp = Couch.put(url, query: query, body: doc) + assert resp.status_code == 201 + end + + test "compacting a full partition works", context do + db_name = context[:db_name] + db_info1 = get_db_info(db_name) + fill_partition(db_name) + compact(db_name) + db_info2 = get_db_info(db_name) + assert db_info2["sizes"]["file"] != db_info1["sizes"]["file"] + end + + test "indexing a full partition works", context do + db_name = context[:db_name] + fill_partition(db_name) + + ddoc = %{ + _id: "_design/foo", + views: %{ + bar: %{ + map: "function(doc) {emit(doc.group, 1);}" + } + } + } + + save_doc(db_name, ddoc) + + url = "/#{db_name}/_partition/foo/_design/foo/_view/bar" + resp = Couch.get(url) + assert resp.status_code == 200 + %{body: body} = resp + + assert length(body["rows"]) > 0 + end + + test "purging docs allows writes", context do + db_name = context[:db_name] + fill_partition(db_name) + + info = get_partition_info(db_name, "foo") + limit = info["doc_count"] - 1 + + query = [ + start_key: "\"foo:0000\"", + end_key: "\"foo:9999\"", + limit: limit + ] + + resp = Couch.get("/#{db_name}/_all_docs", query: query) + assert resp.status_code == 200 + %{body: body} = resp + + pbody = + body["rows"] + |> Enum.reduce(%{}, fn row, acc -> + Map.put(acc, row["id"], [row["value"]["rev"]]) + end) + + resp = Couch.post("/#{db_name}/_purge", query: [w: 3], body: pbody) + assert resp.status_code == 201 + + save_doc(db_name, %{_id: "foo:bar", value: "some value"}) + end + + test "increasing partition size allows more writes", context do + db_name = context[:db_name] + fill_partition(db_name) + + # We use set_config_raw so that we're not setting + # on_exit handlers that might interfere with the original + # config change done in setup of this test + new_size = Integer.to_string(@max_size * 1000) + set_config_raw("couchdb", "max_partition_size", new_size) + + save_doc(db_name, %{_id: "foo:bar", value: "stuff"}) + end + + test "decreasing partition size disables more writes", context do + db_name = context[:db_name] + + # We use set_config_raw so that we're not setting + # on_exit handlers that might interfere with the original + # config change done in setup of this test + new_size = Integer.to_string(@max_size * 1000) + set_config_raw("couchdb", "max_partition_size", new_size) + + fill_partition(db_name) + save_doc(db_name, %{_id: "foo:bar", value: "stuff"}) + + old_size = Integer.to_string(@max_size) + set_config_raw("couchdb", "max_partition_size", old_size) + + save_doc(db_name, %{_id: "foo:baz", value: "stuff"}, 403) + end +end -- cgit v1.2.1