summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRussell Branca <chewbranca@apache.org>2021-07-08 13:51:07 -0700
committerGitHub <noreply@github.com>2021-07-08 13:51:07 -0700
commitd043258a3e337586fdd003ca2df9a5afeba0afa6 (patch)
tree181db2b417009e72e0cebf5e8c37b68580901fa2
parent3675e5ecf71e2f749404c82012a4804d02894701 (diff)
downloadcouchdb-d043258a3e337586fdd003ca2df9a5afeba0afa6.tar.gz
Fix partitioned db shard recreation logic (#3633)
-rw-r--r--src/dreyfus/src/dreyfus_rpc.erl11
-rw-r--r--src/fabric/src/fabric_rpc.erl4
-rw-r--r--src/fabric/src/fabric_util.erl2
-rw-r--r--src/mem3/src/mem3_rpc.erl2
-rw-r--r--src/mem3/src/mem3_shards.erl14
-rw-r--r--src/mem3/src/mem3_util.erl62
-rw-r--r--src/mem3/test/eunit/mem3_shards_test.erl129
7 files changed, 203 insertions, 21 deletions
diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl
index 5542bd029..cc50d0999 100644
--- a/src/dreyfus/src/dreyfus_rpc.erl
+++ b/src/dreyfus/src/dreyfus_rpc.erl
@@ -103,13 +103,14 @@ disk_size(DbName, DDoc, IndexName) ->
get_or_create_db(DbName, Options) ->
case couch_db:open_int(DbName, Options) of
- {not_found, no_db_file} ->
- couch_log:warning("~p creating ~s", [?MODULE, DbName]),
- couch_server:create(DbName, Options);
- Else ->
- Else
+ {not_found, no_db_file} ->
+ couch_log:warning("~p creating ~s", [?MODULE, DbName]),
+ mem3_util:get_or_create_db(DbName, Options);
+ Else ->
+ Else
end.
+
calculate_seqs(Db, Stale) ->
LastSeq = couch_db:get_update_seq(Db),
if
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 7776bd9fe..9ed8efd14 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -304,7 +304,7 @@ reset_validation_funs(DbName) ->
open_shard(Name, Opts) ->
set_io_priority(Name, Opts),
try
- rexi:reply(couch_db:open(Name, Opts))
+ rexi:reply(mem3_util:get_or_create_db(Name, Opts))
catch exit:{timeout, _} ->
couch_stats:increment_counter([fabric, open_shard, timeouts])
end.
@@ -439,7 +439,7 @@ get_node_seqs(Db, Nodes) ->
get_or_create_db(DbName, Options) ->
- mem3_util:get_or_create_db(DbName, Options).
+ mem3_util:get_or_create_db_int(DbName, Options).
get_view_cb(#mrargs{extra = Options}) ->
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 8aa14e73a..84ffef122 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -105,7 +105,7 @@ get_db(DbName, Options) ->
Nodes = [node()|erlang:nodes()],
Live = [S || #shard{node = N} = S <- Shards, lists:member(N, Nodes)],
Factor = list_to_integer(config:get("fabric", "shard_timeout_factor", "2")),
- get_shard(Live, [{create_if_missing, true} | Options], 100, Factor).
+ get_shard(Live, Options, 100, Factor).
get_shard([], _Opts, _Timeout, _Factor) ->
erlang:error({internal_server_error, "No DB shards could be opened."});
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 5d1c62c06..9e0f42a8e 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -401,7 +401,7 @@ rexi_call(Node, MFA, Timeout) ->
get_or_create_db(DbName, Options) ->
- mem3_util:get_or_create_db(DbName, Options).
+ mem3_util:get_or_create_db_int(DbName, Options).
-ifdef(TEST).
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 4f3323740..fd1894abe 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -46,7 +46,8 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-opts_for_db(DbName) ->
+opts_for_db(DbName0) ->
+ DbName = mem3:dbname(DbName0),
{ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()),
case couch_db:open_doc(Db, DbName, [ejson_body]) of
{ok, #doc{body = {Props}}} ->
@@ -358,7 +359,7 @@ changes_callback({change, {Change}, _}, _) ->
ets:insert(?OPENERS, {DbName, Writer}),
Msg = {cache_insert_change, DbName, Writer, Seq},
gen_server:cast(?MODULE, Msg),
- [create_if_missing(mem3:name(S), mem3:engine(S)) || S
+ [create_if_missing(mem3:name(S)) || S
<- Shards, mem3:node(S) =:= node()]
end
end
@@ -408,17 +409,18 @@ in_range(Shard, HashKey) ->
[B, E] = mem3:range(Shard),
B =< HashKey andalso HashKey =< E.
-create_if_missing(Name, Options) ->
- case couch_server:exists(Name) of
+create_if_missing(ShardName) ->
+ case couch_server:exists(ShardName) of
true ->
ok;
false ->
- case couch_server:create(Name, [?ADMIN_CTX] ++ Options) of
+ Options = opts_for_db(ShardName),
+ case couch_server:create(ShardName, [?ADMIN_CTX] ++ Options) of
{ok, Db} ->
couch_db:close(Db);
Error ->
couch_log:error("~p tried to create ~s, got ~p",
- [?MODULE, Name, Error])
+ [?MODULE, ShardName, Error])
end
end.
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index b63b2d448..005a6b1bc 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -14,7 +14,8 @@
-export([name_shard/2, create_partition_map/5, build_shards/2,
n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
- shard_info/1, ensure_exists/1, open_db_doc/1, get_or_create_db/2]).
+ shard_info/1, ensure_exists/1, open_db_doc/1, update_db_doc/1]).
+-export([get_or_create_db/2, get_or_create_db_int/2]).
-export([is_deleted/1, rotate_list/2]).
-export([get_shard_opts/1, get_engine_opt/1, get_props_opt/1]).
-export([get_shard_props/1, find_dirty_shards/0]).
@@ -116,6 +117,34 @@ write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
couch_db:close(Db)
end.
+update_db_doc(Doc) ->
+ update_db_doc(mem3_sync:shards_db(), Doc, true).
+
+update_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ try couch_db:open_doc(Db, Id, [ejson_body]) of
+ {ok, #doc{body = Body}} ->
+ % the doc is already in the desired state, we're done here
+ ok;
+ {ok, #doc{body = Body1}} ->
+ % the doc has a new body to be written
+ {ok, _} = couch_db:update_doc(Db, Doc#doc{body=Body1}, []),
+ ok;
+ {not_found, _} when ShouldMutate ->
+ try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or a different edit
+ update_db_doc(DbName, Doc, false)
+ end;
+ _ ->
+ % the doc already exists in a different state
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
delete_db_doc(DocId) ->
gen_server:cast(mem3_shards, {cache_remove, DocId}),
delete_db_doc(mem3_sync:shards_db(), DocId, true).
@@ -508,18 +537,39 @@ sort_ranges_fun({B1, _}, {B2, _}) ->
B1 =< B2.
+add_db_config_options(DbName, Options) ->
+ DbOpts = case mem3:dbname(DbName) of
+ DbName -> [];
+ MDbName -> mem3_shards:opts_for_db(MDbName)
+ end,
+ merge_opts(DbOpts, Options).
+
+
get_or_create_db(DbName, Options) ->
+ case couch_db:open(DbName, Options) of
+ {ok, _} = OkDb ->
+ OkDb;
+ {not_found, no_db_file} ->
+ try
+ Options1 = [{create_if_missing, true} | Options],
+ Options2 = add_db_config_options(DbName, Options1),
+ couch_db:open(DbName, Options2)
+ catch error:database_does_not_exist ->
+ throw({error, missing_target})
+ end;
+ Else ->
+ Else
+ end.
+
+
+get_or_create_db_int(DbName, Options) ->
case couch_db:open_int(DbName, Options) of
{ok, _} = OkDb ->
OkDb;
{not_found, no_db_file} ->
try
- DbOpts = case mem3:dbname(DbName) of
- DbName -> [];
- MDbName -> mem3_shards:opts_for_db(MDbName)
- end,
Options1 = [{create_if_missing, true} | Options],
- Options2 = merge_opts(DbOpts, Options1),
+ Options2 = add_db_config_options(DbName, Options1),
couch_db:open_int(DbName, Options2)
catch error:database_does_not_exist ->
throw({error, missing_target})
diff --git a/src/mem3/test/eunit/mem3_shards_test.erl b/src/mem3/test/eunit/mem3_shards_test.erl
new file mode 100644
index 000000000..9c9bbb402
--- /dev/null
+++ b/src/mem3/test/eunit/mem3_shards_test.erl
@@ -0,0 +1,129 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_shards_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function
+
+-define(ID, <<"_id">>).
+-define(TIMEOUT, 60).
+
+setup() ->
+ DbName = ?tempdb(),
+ PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
+ create_db(DbName, [{q, 8}, {n, 1}, {props, PartProps}]),
+ {ok, DbDoc} = mem3_util:open_db_doc(DbName),
+ #{dbname => DbName, dbdoc => DbDoc}.
+
+
+teardown(#{dbname := DbName}) ->
+ delete_db(DbName).
+
+
+start_couch() ->
+ test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_shards_db_create_props_test_() ->
+ {
+ "mem3 shards partition query database properties tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun partitioned_shards_recreated_properly/1
+ ]
+ }
+ }
+ }.
+
+
+% This asserts that when the mem3_shards's changes listener on the shards db
+% encounters a db doc update for a db that has a missing shard on the local
+% instance, the shard creation logic will properly propagate the db's config
+% properties.
+% SEE: apache/couchdb#3631
+partitioned_shards_recreated_properly(#{dbname := DbName, dbdoc := DbDoc}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ #doc{body = {Body0}} = DbDoc,
+ Body1 = [{<<"foo">>, <<"bar">>} | Body0],
+ Shards = [Shard|_] = lists:sort(mem3:shards(DbName)),
+ ShardName = Shard#shard.name,
+ ?assert(is_partitioned(Shards)),
+ ok = with_proc(fun() -> couch_server:delete(ShardName, []) end),
+ ?assertThrow({not_found, no_db_file}, is_partitioned(Shard)),
+ ok = mem3_util:update_db_doc(DbDoc#doc{body = {Body1}}),
+ Shards = [Shard|_] = test_util:wait_value(fun() ->
+ lists:sort(mem3:shards(DbName))
+ end, Shards),
+ ?assertEqual(true, test_util:wait_value(fun() ->
+ catch is_partitioned(Shard)
+ end, true))
+ end)}.
+
+
+is_partitioned([#shard{}|_]=Shards) ->
+ lists:all(fun is_partitioned/1, Shards);
+is_partitioned(#shard{name=Name}) ->
+ couch_util:with_db(Name, fun couch_db:is_partitioned/1);
+is_partitioned(Db) ->
+ couch_db:is_partitioned(Db).
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+