diff options
author | Russell Branca <chewbranca@apache.org> | 2021-07-08 13:51:07 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-08 13:51:07 -0700 |
commit | d043258a3e337586fdd003ca2df9a5afeba0afa6 (patch) | |
tree | 181db2b417009e72e0cebf5e8c37b68580901fa2 | |
parent | 3675e5ecf71e2f749404c82012a4804d02894701 (diff) | |
download | couchdb-d043258a3e337586fdd003ca2df9a5afeba0afa6.tar.gz |
Fix partitioned db shard recreation logic (#3633)
-rw-r--r-- | src/dreyfus/src/dreyfus_rpc.erl | 11 | ||||
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric_util.erl | 2 | ||||
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 2 | ||||
-rw-r--r-- | src/mem3/src/mem3_shards.erl | 14 | ||||
-rw-r--r-- | src/mem3/src/mem3_util.erl | 62 | ||||
-rw-r--r-- | src/mem3/test/eunit/mem3_shards_test.erl | 129 |
7 files changed, 203 insertions, 21 deletions
diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl index 5542bd029..cc50d0999 100644 --- a/src/dreyfus/src/dreyfus_rpc.erl +++ b/src/dreyfus/src/dreyfus_rpc.erl @@ -103,13 +103,14 @@ disk_size(DbName, DDoc, IndexName) -> get_or_create_db(DbName, Options) -> case couch_db:open_int(DbName, Options) of - {not_found, no_db_file} -> - couch_log:warning("~p creating ~s", [?MODULE, DbName]), - couch_server:create(DbName, Options); - Else -> - Else + {not_found, no_db_file} -> + couch_log:warning("~p creating ~s", [?MODULE, DbName]), + mem3_util:get_or_create_db(DbName, Options); + Else -> + Else end. + calculate_seqs(Db, Stale) -> LastSeq = couch_db:get_update_seq(Db), if diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 7776bd9fe..9ed8efd14 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -304,7 +304,7 @@ reset_validation_funs(DbName) -> open_shard(Name, Opts) -> set_io_priority(Name, Opts), try - rexi:reply(couch_db:open(Name, Opts)) + rexi:reply(mem3_util:get_or_create_db(Name, Opts)) catch exit:{timeout, _} -> couch_stats:increment_counter([fabric, open_shard, timeouts]) end. @@ -439,7 +439,7 @@ get_node_seqs(Db, Nodes) -> get_or_create_db(DbName, Options) -> - mem3_util:get_or_create_db(DbName, Options). + mem3_util:get_or_create_db_int(DbName, Options). get_view_cb(#mrargs{extra = Options}) -> diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 8aa14e73a..84ffef122 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -105,7 +105,7 @@ get_db(DbName, Options) -> Nodes = [node()|erlang:nodes()], Live = [S || #shard{node = N} = S <- Shards, lists:member(N, Nodes)], Factor = list_to_integer(config:get("fabric", "shard_timeout_factor", "2")), - get_shard(Live, [{create_if_missing, true} | Options], 100, Factor). + get_shard(Live, Options, 100, Factor). get_shard([], _Opts, _Timeout, _Factor) -> erlang:error({internal_server_error, "No DB shards could be opened."}); diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 5d1c62c06..9e0f42a8e 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -401,7 +401,7 @@ rexi_call(Node, MFA, Timeout) -> get_or_create_db(DbName, Options) -> - mem3_util:get_or_create_db(DbName, Options). + mem3_util:get_or_create_db_int(DbName, Options). -ifdef(TEST). diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 4f3323740..fd1894abe 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -46,7 +46,8 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -opts_for_db(DbName) -> +opts_for_db(DbName0) -> + DbName = mem3:dbname(DbName0), {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()), case couch_db:open_doc(Db, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> @@ -358,7 +359,7 @@ changes_callback({change, {Change}, _}, _) -> ets:insert(?OPENERS, {DbName, Writer}), Msg = {cache_insert_change, DbName, Writer, Seq}, gen_server:cast(?MODULE, Msg), - [create_if_missing(mem3:name(S), mem3:engine(S)) || S + [create_if_missing(mem3:name(S)) || S <- Shards, mem3:node(S) =:= node()] end end @@ -408,17 +409,18 @@ in_range(Shard, HashKey) -> [B, E] = mem3:range(Shard), B =< HashKey andalso HashKey =< E. -create_if_missing(Name, Options) -> - case couch_server:exists(Name) of +create_if_missing(ShardName) -> + case couch_server:exists(ShardName) of true -> ok; false -> - case couch_server:create(Name, [?ADMIN_CTX] ++ Options) of + Options = opts_for_db(ShardName), + case couch_server:create(ShardName, [?ADMIN_CTX] ++ Options) of {ok, Db} -> couch_db:close(Db); Error -> couch_log:error("~p tried to create ~s, got ~p", - [?MODULE, Name, Error]) + [?MODULE, ShardName, Error]) end end. diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index b63b2d448..005a6b1bc 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -14,7 +14,8 @@ -export([name_shard/2, create_partition_map/5, build_shards/2, n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, - shard_info/1, ensure_exists/1, open_db_doc/1, get_or_create_db/2]). + shard_info/1, ensure_exists/1, open_db_doc/1, update_db_doc/1]). +-export([get_or_create_db/2, get_or_create_db_int/2]). -export([is_deleted/1, rotate_list/2]). -export([get_shard_opts/1, get_engine_opt/1, get_props_opt/1]). -export([get_shard_props/1, find_dirty_shards/0]). @@ -116,6 +117,34 @@ write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) -> couch_db:close(Db) end. +update_db_doc(Doc) -> + update_db_doc(mem3_sync:shards_db(), Doc, true). + +update_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) -> + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + try couch_db:open_doc(Db, Id, [ejson_body]) of + {ok, #doc{body = Body}} -> + % the doc is already in the desired state, we're done here + ok; + {ok, #doc{body = Body1}} -> + % the doc has a new body to be written + {ok, _} = couch_db:update_doc(Db, Doc#doc{body=Body1}, []), + ok; + {not_found, _} when ShouldMutate -> + try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + ok + catch conflict -> + % check to see if this was a replication race or a different edit + update_db_doc(DbName, Doc, false) + end; + _ -> + % the doc already exists in a different state + conflict + after + couch_db:close(Db) + end. + delete_db_doc(DocId) -> gen_server:cast(mem3_shards, {cache_remove, DocId}), delete_db_doc(mem3_sync:shards_db(), DocId, true). @@ -508,18 +537,39 @@ sort_ranges_fun({B1, _}, {B2, _}) -> B1 =< B2. +add_db_config_options(DbName, Options) -> + DbOpts = case mem3:dbname(DbName) of + DbName -> []; + MDbName -> mem3_shards:opts_for_db(MDbName) + end, + merge_opts(DbOpts, Options). + + get_or_create_db(DbName, Options) -> + case couch_db:open(DbName, Options) of + {ok, _} = OkDb -> + OkDb; + {not_found, no_db_file} -> + try + Options1 = [{create_if_missing, true} | Options], + Options2 = add_db_config_options(DbName, Options1), + couch_db:open(DbName, Options2) + catch error:database_does_not_exist -> + throw({error, missing_target}) + end; + Else -> + Else + end. + + +get_or_create_db_int(DbName, Options) -> case couch_db:open_int(DbName, Options) of {ok, _} = OkDb -> OkDb; {not_found, no_db_file} -> try - DbOpts = case mem3:dbname(DbName) of - DbName -> []; - MDbName -> mem3_shards:opts_for_db(MDbName) - end, Options1 = [{create_if_missing, true} | Options], - Options2 = merge_opts(DbOpts, Options1), + Options2 = add_db_config_options(DbName, Options1), couch_db:open_int(DbName, Options2) catch error:database_does_not_exist -> throw({error, missing_target}) diff --git a/src/mem3/test/eunit/mem3_shards_test.erl b/src/mem3/test/eunit/mem3_shards_test.erl new file mode 100644 index 000000000..9c9bbb402 --- /dev/null +++ b/src/mem3/test/eunit/mem3_shards_test.erl @@ -0,0 +1,129 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_shards_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/src/mem3_reshard.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function + +-define(ID, <<"_id">>). +-define(TIMEOUT, 60). + +setup() -> + DbName = ?tempdb(), + PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}], + create_db(DbName, [{q, 8}, {n, 1}, {props, PartProps}]), + {ok, DbDoc} = mem3_util:open_db_doc(DbName), + #{dbname => DbName, dbdoc => DbDoc}. + + +teardown(#{dbname := DbName}) -> + delete_db(DbName). + + +start_couch() -> + test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]). + + +stop_couch(Ctx) -> + test_util:stop_couch(Ctx). + + +mem3_shards_db_create_props_test_() -> + { + "mem3 shards partition query database properties tests", + { + setup, + fun start_couch/0, fun stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun partitioned_shards_recreated_properly/1 + ] + } + } + }. + + +% This asserts that when the mem3_shards's changes listener on the shards db +% encounters a db doc update for a db that has a missing shard on the local +% instance, the shard creation logic will properly propagate the db's config +% properties. +% SEE: apache/couchdb#3631 +partitioned_shards_recreated_properly(#{dbname := DbName, dbdoc := DbDoc}) -> + {timeout, ?TIMEOUT, ?_test(begin + #doc{body = {Body0}} = DbDoc, + Body1 = [{<<"foo">>, <<"bar">>} | Body0], + Shards = [Shard|_] = lists:sort(mem3:shards(DbName)), + ShardName = Shard#shard.name, + ?assert(is_partitioned(Shards)), + ok = with_proc(fun() -> couch_server:delete(ShardName, []) end), + ?assertThrow({not_found, no_db_file}, is_partitioned(Shard)), + ok = mem3_util:update_db_doc(DbDoc#doc{body = {Body1}}), + Shards = [Shard|_] = test_util:wait_value(fun() -> + lists:sort(mem3:shards(DbName)) + end, Shards), + ?assertEqual(true, test_util:wait_value(fun() -> + catch is_partitioned(Shard) + end, true)) + end)}. + + +is_partitioned([#shard{}|_]=Shards) -> + lists:all(fun is_partitioned/1, Shards); +is_partitioned(#shard{name=Name}) -> + couch_util:with_db(Name, fun couch_db:is_partitioned/1); +is_partitioned(Db) -> + couch_db:is_partitioned(Db). + + +create_db(DbName, Opts) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). + + +delete_db(DbName) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). + + +with_proc(Fun) -> + with_proc(Fun, undefined, 30000). + + +with_proc(Fun, GroupLeader) -> + with_proc(Fun, GroupLeader, 30000). + + +with_proc(Fun, GroupLeader, Timeout) -> + {Pid, Ref} = spawn_monitor(fun() -> + case GroupLeader of + undefined -> ok; + _ -> erlang:group_leader(GroupLeader, self()) + end, + exit({with_proc_res, Fun()}) + end), + receive + {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> + Res; + {'DOWN', Ref, process, Pid, Error} -> + error(Error) + after Timeout -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + error({with_proc_timeout, Fun, Timeout}) + end. + |