summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-12-14 11:06:03 -0600
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-01-18 12:43:11 -0600
commitd708bd6776fcce2cb243a05253fce477b037d5cb (patch)
treed2d329937e04ac040ad84353b8a6f116abbc0629
parent68a0557f93cdc4a519bf28ae5f628ea76fabe72a (diff)
downloadcouchdb-d708bd6776fcce2cb243a05253fce477b037d5cb.tar.gz
Enforce partition size limitsfeature/database-partition-limits
This limit helps prevent users from inadvertently misusing partitions by refusing to add documents when the size of a partition exceeds 10GiB. Co-authored-by: Robert Newson <rnewson@apache.org>
-rw-r--r--rel/overlay/etc/default.ini5
-rw-r--r--src/chttpd/src/chttpd.erl5
-rw-r--r--src/couch/src/couch_db_updater.erl92
-rw-r--r--test/elixir/test/partition_size_limit_test.exs305
4 files changed, 403 insertions, 4 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index a77add4bd..ae9d3133e 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -64,6 +64,11 @@ default_engine = couch
; move deleted databases/shards there instead. You can then manually delete
; these files later, as desired.
;enable_database_recovery = false
+;
+; Set the maximum size allowed for a partition. This helps users avoid
+; inadvertently abusing partitions resulting in hot shards. The default
+; is 10GiB. A value of 0 or less will disable partition size checks.
+;max_partition_size = 10737418240
[couchdb_engines]
; The keys in this section are the filename extension that
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 2f241cdad..0b3349a24 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -873,6 +873,11 @@ error_info(conflict) ->
{409, <<"conflict">>, <<"Document update conflict.">>};
error_info({conflict, _}) ->
{409, <<"conflict">>, <<"Document update conflict.">>};
+error_info({partition_overflow, DocId}) ->
+ Descr = <<
+ "Partition limit exceeded due to update on '", DocId/binary, "'"
+ >>,
+ {403, <<"partition_overflow">>, Descr};
error_info({{not_found, missing}, {_, _}}) ->
{409, <<"not_found">>, <<"missing_rev">>};
error_info({forbidden, Error, Msg}) ->
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 95508e248..4227ff036 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -21,6 +21,7 @@
-include("couch_db_int.hrl").
-define(IDLE_LIMIT_DEFAULT, 61000).
+-define(DEFAULT_MAX_PARTITION_SIZE, 16#280000000). % 10 GiB
-record(merge_acc, {
@@ -28,7 +29,8 @@
merge_conflicts,
add_infos = [],
rem_seqs = [],
- cur_seq
+ cur_seq,
+ full_partitions = []
}).
@@ -466,13 +468,22 @@ merge_rev_trees([], [], Acc) ->
merge_rev_trees([NewDocs | RestDocsList], [OldDocInfo | RestOldInfo], Acc) ->
#merge_acc{
revs_limit = Limit,
- merge_conflicts = MergeConflicts
+ merge_conflicts = MergeConflicts,
+ full_partitions = FullPartitions
} = Acc,
% Track doc ids so we can debug large revision trees
erlang:put(last_id_merged, OldDocInfo#full_doc_info.id),
NewDocInfo0 = lists:foldl(fun({Client, NewDoc}, OldInfoAcc) ->
- merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts)
+ NewInfo = merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts),
+ case is_overflowed(NewInfo, OldInfoAcc, FullPartitions) of
+ true when not MergeConflicts ->
+ DocId = NewInfo#full_doc_info.id,
+ send_result(Client, NewDoc, {partition_overflow, DocId}),
+ OldInfoAcc;
+ _ ->
+ NewInfo
+ end
end, OldDocInfo, NewDocs),
NewDocInfo1 = maybe_stem_full_doc_info(NewDocInfo0, Limit),
% When MergeConflicts is false, we updated #full_doc_info.deleted on every
@@ -595,6 +606,24 @@ merge_rev_tree(OldInfo, NewDoc, _Client, true) ->
{NewTree, _} = couch_key_tree:merge(OldTree, NewTree0),
OldInfo#full_doc_info{rev_tree = NewTree}.
+is_overflowed(_New, _Old, []) ->
+ false;
+is_overflowed(Old, Old, _FullPartitions) ->
+ false;
+is_overflowed(New, Old, FullPartitions) ->
+ case New#full_doc_info.id of
+ <<"_design/", _/binary>> ->
+ false;
+ DDocId ->
+ Partition = couch_partition:from_docid(DDocId),
+ case lists:member(Partition, FullPartitions) of
+ true ->
+ estimate_size(New) > estimate_size(Old);
+ false ->
+ false
+ end
+ end.
+
maybe_stem_full_doc_info(#full_doc_info{rev_tree = Tree} = Info, Limit) ->
case config:get_boolean("couchdb", "stem_interactive_updates", true) of
true ->
@@ -617,13 +646,34 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
(Id, not_found) ->
#full_doc_info{id=Id}
end, Ids, OldDocLookups),
+
+ %% Get the list of full partitions
+ FullPartitions = case couch_db:is_partitioned(Db) of
+ true ->
+ case max_partition_size() of
+ N when N =< 0 ->
+ [];
+ Max ->
+ Partitions = lists:usort(lists:flatmap(fun(Id) ->
+ case couch_partition:extract(Id) of
+ undefined -> [];
+ {Partition, _} -> [Partition]
+ end
+ end, Ids)),
+ [P || P <- Partitions, partition_size(Db, P) >= Max]
+ end;
+ false ->
+ []
+ end,
+
% Merge the new docs into the revision trees.
AccIn = #merge_acc{
revs_limit = RevsLimit,
merge_conflicts = MergeConflicts,
add_infos = [],
rem_seqs = [],
- cur_seq = UpdateSeq
+ cur_seq = UpdateSeq,
+ full_partitions = FullPartitions
},
{ok, AccOut} = merge_rev_trees(DocsList, OldDocInfos, AccIn),
#merge_acc{
@@ -685,6 +735,40 @@ increment_local_doc_revs(#doc{revs = {0, [RevStr | _]}} = Doc) ->
increment_local_doc_revs(#doc{}) ->
{error, <<"Invalid rev format">>}.
+max_partition_size() ->
+ config:get_integer("couchdb", "max_partition_size",
+ ?DEFAULT_MAX_PARTITION_SIZE).
+
+partition_size(Db, Partition) ->
+ {ok, Info} = couch_db:get_partition_info(Db, Partition),
+ Sizes = couch_util:get_value(sizes, Info),
+ couch_util:get_value(external, Sizes).
+
+estimate_size(#full_doc_info{} = FDI) ->
+ #full_doc_info{rev_tree = RevTree} = FDI,
+ Fun = fun
+ (_Rev, Value, leaf, SizesAcc) ->
+ case Value of
+ #doc{} = Doc ->
+ ExternalSize = get_meta_body_size(Value#doc.meta),
+ {size_info, AttSizeInfo} =
+ lists:keyfind(size_info, 1, Doc#doc.meta),
+ Leaf = #leaf{
+ sizes = #size_info{
+ external = ExternalSize
+ },
+ atts = AttSizeInfo
+ },
+ add_sizes(leaf, Leaf, SizesAcc);
+ #leaf{} ->
+ add_sizes(leaf, Value, SizesAcc)
+ end;
+ (_Rev, _Value, branch, SizesAcc) ->
+ SizesAcc
+ end,
+ {_, FinalES, FinalAtts} = couch_key_tree:fold(Fun, {0, 0, []}, RevTree),
+ TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
+ FinalES + TotalAttSize.
purge_docs(Db, []) ->
{ok, Db, []};
diff --git a/test/elixir/test/partition_size_limit_test.exs b/test/elixir/test/partition_size_limit_test.exs
new file mode 100644
index 000000000..b4be6480e
--- /dev/null
+++ b/test/elixir/test/partition_size_limit_test.exs
@@ -0,0 +1,305 @@
+defmodule PartitionSizeLimitTest do
+ use CouchTestCase
+
+ @moduledoc """
+ Test Partition size limit functionality
+ """
+
+ @max_size 10_240
+
+ setup do
+ db_name = random_db_name()
+ {:ok, _} = create_db(db_name, query: %{partitioned: true, q: 1})
+ on_exit(fn -> delete_db(db_name) end)
+
+ set_config({"couchdb", "max_partition_size", Integer.to_string(@max_size)})
+
+ {:ok, [db_name: db_name]}
+ end
+
+ defp get_db_info(dbname) do
+ resp = Couch.get("/#{dbname}")
+ assert resp.status_code == 200
+ %{:body => body} = resp
+ body
+ end
+
+ defp get_partition_info(dbname, partition) do
+ resp = Couch.get("/#{dbname}/_partition/#{partition}")
+ assert resp.status_code == 200
+ %{:body => body} = resp
+ body
+ end
+
+ defp open_doc(db_name, docid, status_assert \\ 200) do
+ resp = Couch.get("/#{db_name}/#{docid}")
+ assert resp.status_code == status_assert
+ %{:body => body} = resp
+ body
+ end
+
+ defp save_doc(db_name, doc, status_assert \\ 201) do
+ resp = Couch.post("/#{db_name}", query: [w: 3], body: doc)
+ assert resp.status_code == status_assert
+ %{:body => body} = resp
+ body["rev"]
+ end
+
+ defp delete_doc(db_name, doc, status_assert \\ 200) do
+ url = "/#{db_name}/#{doc["_id"]}"
+ rev = doc["_rev"]
+ resp = Couch.delete(url, query: [w: 3, rev: rev])
+ assert resp.status_code == status_assert
+ %{:body => body} = resp
+ body["rev"]
+ end
+
+ defp fill_partition(db_name, partition \\ "foo") do
+ docs =
+ 1..15
+ |> Enum.map(fn i ->
+ id = i |> Integer.to_string() |> String.pad_leading(4, "0")
+ docid = "#{partition}:#{id}"
+ %{_id: docid, value: "0" |> String.pad_leading(1024)}
+ end)
+
+ body = %{:w => 3, :docs => docs}
+ resp = Couch.post("/#{db_name}/_bulk_docs", body: body)
+ assert resp.status_code == 201
+ end
+
+ defp compact(db) do
+ assert Couch.post("/#{db}/_compact").status_code == 202
+
+ retry_until(
+ fn ->
+ Couch.get("/#{db}").body["compact_running"] == false
+ end,
+ 200,
+ 20_000
+ )
+ end
+
+ test "fill partition manually", context do
+ db_name = context[:db_name]
+ partition = "foo"
+
+ resp =
+ 1..1000
+ |> Enum.find_value(0, fn i ->
+ id = i |> Integer.to_string() |> String.pad_leading(4, "0")
+ docid = "#{partition}:#{id}"
+ doc = %{_id: docid, value: "0" |> String.pad_leading(1024)}
+ resp = Couch.post("/#{db_name}", query: [w: 3], body: doc)
+
+ if resp.status_code == 201 do
+ false
+ else
+ resp
+ end
+ end)
+
+ assert resp.status_code == 403
+ %{body: body} = resp
+ assert body["error"] == "partition_overflow"
+
+ info = get_partition_info(db_name, partition)
+ assert info["sizes"]["external"] >= @max_size
+ end
+
+ test "full partitions reject POST /dbname", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ doc = %{_id: "foo:bar", value: "stuff"}
+ resp = Couch.post("/#{db_name}", query: [w: 3], body: doc)
+ assert resp.status_code == 403
+ %{body: body} = resp
+ assert body["error"] == "partition_overflow"
+ end
+
+ test "full partitions reject PUT /dbname/docid", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ doc = %{value: "stuff"}
+ resp = Couch.put("/#{db_name}/foo:bar", query: [w: 3], body: doc)
+ assert resp.status_code == 403
+ %{body: body} = resp
+ assert body["error"] == "partition_overflow"
+ end
+
+ test "full partitions reject POST /dbname/_bulk_docs", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ body = %{w: 3, docs: [%{_id: "foo:bar"}]}
+ resp = Couch.post("/#{db_name}/_bulk_docs", query: [w: 3], body: body)
+ assert resp.status_code == 201
+ %{body: body} = resp
+ doc_resp = Enum.at(body, 0)
+ assert doc_resp["error"] == "partition_overflow"
+ end
+
+ test "full partitions with mixed POST /dbname/_bulk_docs", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ body = %{w: 3, docs: [%{_id: "foo:bar"}, %{_id: "baz:bang"}]}
+ resp = Couch.post("/#{db_name}/_bulk_docs", query: [w: 3], body: body)
+ assert resp.status_code == 201
+ %{body: body} = resp
+
+ doc_resp1 = Enum.at(body, 0)
+ assert doc_resp1["error"] == "partition_overflow"
+
+ doc_resp2 = Enum.at(body, 1)
+ assert doc_resp2["ok"]
+ end
+
+ test "full partitions are still readable", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+ open_doc(db_name, "foo:0001")
+ end
+
+ test "full partitions can accept deletes", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ doc = open_doc(db_name, "foo:0001")
+ delete_doc(db_name, doc)
+ end
+
+ test "full partitions can accept updates that reduce size", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ doc = open_doc(db_name, "foo:0001")
+ save_doc(db_name, %{doc | "value" => ""})
+ end
+
+ test "full partition does not affect other partitions", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+ save_doc(db_name, %{_id: "bar:foo", value: "stuff"})
+ end
+
+ test "full partition does not affect design documents", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+ rev1 = save_doc(db_name, %{_id: "_design/foo", value: "stuff"})
+ save_doc(db_name, %{_id: "_design/foo", _rev: rev1, value: "hi"})
+ doc = open_doc(db_name, "_design/foo")
+ delete_doc(db_name, doc)
+ end
+
+ test "replication into a full partition works", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+ save_doc(db_name, %{_id: "foo:bar", value: "stuff"}, 403)
+
+ doc = %{
+ _id: "foo:bar",
+ _rev: <<"1-23202479633c2b380f79507a776743d5">>,
+ value: "stuff"
+ }
+
+ url = "/#{db_name}/#{doc[:_id]}"
+ query = [new_edits: false, w: 3]
+ resp = Couch.put(url, query: query, body: doc)
+ assert resp.status_code == 201
+ end
+
+ test "compacting a full partition works", context do
+ db_name = context[:db_name]
+ db_info1 = get_db_info(db_name)
+ fill_partition(db_name)
+ compact(db_name)
+ db_info2 = get_db_info(db_name)
+ assert db_info2["sizes"]["file"] != db_info1["sizes"]["file"]
+ end
+
+ test "indexing a full partition works", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ ddoc = %{
+ _id: "_design/foo",
+ views: %{
+ bar: %{
+ map: "function(doc) {emit(doc.group, 1);}"
+ }
+ }
+ }
+
+ save_doc(db_name, ddoc)
+
+ url = "/#{db_name}/_partition/foo/_design/foo/_view/bar"
+ resp = Couch.get(url)
+ assert resp.status_code == 200
+ %{body: body} = resp
+
+ assert length(body["rows"]) > 0
+ end
+
+ test "purging docs allows writes", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ info = get_partition_info(db_name, "foo")
+ limit = info["doc_count"] - 1
+
+ query = [
+ start_key: "\"foo:0000\"",
+ end_key: "\"foo:9999\"",
+ limit: limit
+ ]
+
+ resp = Couch.get("/#{db_name}/_all_docs", query: query)
+ assert resp.status_code == 200
+ %{body: body} = resp
+
+ pbody =
+ body["rows"]
+ |> Enum.reduce(%{}, fn row, acc ->
+ Map.put(acc, row["id"], [row["value"]["rev"]])
+ end)
+
+ resp = Couch.post("/#{db_name}/_purge", query: [w: 3], body: pbody)
+ assert resp.status_code == 201
+
+ save_doc(db_name, %{_id: "foo:bar", value: "some value"})
+ end
+
+ test "increasing partition size allows more writes", context do
+ db_name = context[:db_name]
+ fill_partition(db_name)
+
+ # We use set_config_raw so that we're not setting
+ # on_exit handlers that might interfere with the original
+ # config change done in setup of this test
+ new_size = Integer.to_string(@max_size * 1000)
+ set_config_raw("couchdb", "max_partition_size", new_size)
+
+ save_doc(db_name, %{_id: "foo:bar", value: "stuff"})
+ end
+
+ test "decreasing partition size disables more writes", context do
+ db_name = context[:db_name]
+
+ # We use set_config_raw so that we're not setting
+ # on_exit handlers that might interfere with the original
+ # config change done in setup of this test
+ new_size = Integer.to_string(@max_size * 1000)
+ set_config_raw("couchdb", "max_partition_size", new_size)
+
+ fill_partition(db_name)
+ save_doc(db_name, %{_id: "foo:bar", value: "stuff"})
+
+ old_size = Integer.to_string(@max_size)
+ set_config_raw("couchdb", "max_partition_size", old_size)
+
+ save_doc(db_name, %{_id: "foo:baz", value: "stuff"}, 403)
+ end
+end