diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-04 17:08:05 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-01-05 02:29:37 -0500 |
commit | 43fff603bcc29f5d4138e15b764fa83cc871ce92 (patch) | |
tree | 91b01e0e0cc36903a1e48108d496eef46a4d713b | |
parent | 9dedf6ad35b05536e8c413b5d3bc2737d1b81cf7 (diff) | |
download | couchdb-43fff603bcc29f5d4138e15b764fa83cc871ce92.tar.gz |
Lock shard splitting targets during the initial copy phase
During the initial copy phase target shards are opened outside the
couch_server. Previously, it was possible to manually (via remsh for instance)
open the same targets via the couch_server by using the `couch_db:open/2` API
for example. That could lead to corruption as there would be two writers for
the same DB file.
In order to prevent such a scenario, introduce a mechanism for the shard
splitter to lock the target shards such that any regular open call would fail
during the initial copy phase.
The locking mechanism is generic and would allow local locking of shards for
possibly other reasons in the future as well.
-rw-r--r-- | src/couch/src/couch_db_split.erl | 11 | ||||
-rw-r--r-- | src/couch/src/couch_server.erl | 39 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_split_tests.erl | 35 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_tests.erl | 29 | ||||
-rw-r--r-- | src/mem3/test/eunit/mem3_reshard_test.erl | 35 |
5 files changed, 141 insertions, 8 deletions
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl index 5bf98b6fd..3a1f98d3e 100644 --- a/src/couch/src/couch_db_split.erl +++ b/src/couch/src/couch_db_split.erl @@ -132,6 +132,12 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) -> {error, E} -> throw({target_create_error, DbName, E, Map}) end, + case couch_server:lock(DbName, <<"shard splitting">>) of + ok -> + ok; + {error, Err} -> + throw({target_create_error, DbName, Err, Map}) + end, {ok, Filepath} = couch_server:get_engine_path(DbName, Engine), Opts = [create, ?ADMIN_CTX] ++ case Partitioned of true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}]; @@ -164,7 +170,9 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) -> cleanup_targets(#{} = Targets, Engine) -> maps:map(fun(_, #target{db = Db} = T) -> ok = stop_target_db(Db), - delete_target(couch_db:name(Db), Engine), + DbName = couch_db:name(Db), + delete_target(DbName, Engine), + couch_server:unlock(DbName), T end, Targets). @@ -182,6 +190,7 @@ stop_target_db(Db) -> Pid = couch_db:get_pid(Db), catch unlink(Pid), catch exit(Pid, kill), + couch_server:unlock(couch_db:name(Db)), ok. diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index ab0122eec..eaca3ee9b 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -26,6 +26,7 @@ -export([exists/1]). -export([get_engine_extensions/0]). -export([get_engine_path/2]). +-export([lock/2, unlock/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -77,8 +78,15 @@ get_stats() -> sup_start_link() -> gen_server:start_link({local, couch_server}, couch_server, [], []). +open(DbName, Options) -> + case ets:lookup(couch_dbs_locks, DbName) of + [] -> + open_int(DbName, Options); + [{DbName, Reason}] -> + {error, {locked, Reason}} + end. -open(DbName, Options0) -> +open_int(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), case ets:lookup(couch_dbs, DbName) of [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> @@ -115,7 +123,15 @@ update_lru(DbName, Options) -> close_lru() -> gen_server:call(couch_server, close_lru). -create(DbName, Options0) -> +create(DbName, Options) -> + case ets:lookup(couch_dbs_locks, DbName) of + [] -> + create_int(DbName, Options); + [{DbName, Reason}] -> + {error, {locked, Reason}} + end. + +create_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), couch_partition:validate_dbname(DbName, Options), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of @@ -251,6 +267,12 @@ init([]) -> {read_concurrency, true} ]), ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), + ets:new(couch_dbs_locks, [ + set, + public, + named_table, + {read_concurrency, true} + ]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, engines = Engines, @@ -774,6 +796,19 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) -> {error, {invalid_engine, Engine}} end. +lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) -> + case ets:lookup(couch_dbs, DbName) of + [] -> + true = ets:insert(couch_dbs_locks, {DbName, Reason}), + ok; + [#entry{}] -> + {error, already_opened} + end. + +unlock(DbName) when is_binary(DbName) -> + true = ets:delete(couch_dbs_locks, DbName), + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl index 8e64c39ee..6e24c36ee 100644 --- a/src/couch/test/eunit/couch_db_split_tests.erl +++ b/src/couch/test/eunit/couch_db_split_tests.erl @@ -56,7 +56,8 @@ split_test_() -> fun should_fail_on_missing_source/1, fun should_fail_on_existing_target/1, fun should_fail_on_invalid_target_name/1, - fun should_crash_on_invalid_tmap/1 + fun should_crash_on_invalid_tmap/1, + fun should_fail_on_opened_target/1 ] } ] @@ -104,9 +105,23 @@ should_fail_on_missing_source(_DbName) -> should_fail_on_existing_target(DbName) -> Ranges = make_ranges(2), - TMap = maps:map(fun(_, _) -> DbName end, make_targets(Ranges)), + TMap = maps:map(fun(_, TName) -> + % We create the target but make sure to remove it from the cache so we + % hit the eexist error instaed of already_opened + {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]), + Pid = couch_db:get_pid(Db), + ok = couch_db:close(Db), + exit(Pid, kill), + test_util:wait(fun() -> + case ets:lookup(couch_dbs, TName) of + [] -> ok; + [_ | _] -> wait + end + end), + TName + end, make_targets(Ranges)), Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3), - ?_assertMatch({error, {target_create_error, DbName, eexist}}, Response). + ?_assertMatch({error, {target_create_error, _, eexist}}, Response). should_fail_on_invalid_target_name(DbName) -> @@ -127,6 +142,20 @@ should_crash_on_invalid_tmap(DbName) -> couch_db_split:split(DbName, TMap, fun fake_pickfun/3)). +should_fail_on_opened_target(DbName) -> + Ranges = make_ranges(2), + TMap = maps:map(fun(_, TName) -> + % We create and keep the target open but delete + % its file on disk so we don't fail with eexist + {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]), + FilePath = couch_db:get_filepath(Db), + ok = file:delete(FilePath), + TName + end, make_targets(Ranges)), + ?_assertMatch({error, {target_create_error, _, already_opened}}, + couch_db_split:split(DbName, TMap, fun fake_pickfun/3)). + + copy_local_docs_test_() -> Cases = [ {"Should work with no docs", 0, 2}, diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl index d64f7c640..dd2cb427d 100644 --- a/src/couch/test/eunit/couch_db_tests.erl +++ b/src/couch/test/eunit/couch_db_tests.erl @@ -80,7 +80,8 @@ open_db_test_()-> fun() -> ?tempdb() end, [ fun should_create_db_if_missing/1, - fun should_open_db_if_exists/1 + fun should_open_db_if_exists/1, + fun locking_should_work/1 ] } } @@ -157,6 +158,32 @@ should_open_db_if_exists(DbName) -> ?assert(lists:member(DbName, After)) end). +locking_should_work(DbName) -> + ?_test(begin + ?assertEqual(ok, couch_server:lock(DbName, <<"x">>)), + ?assertEqual({error, {locked, <<"x">>}}, couch_db:create(DbName, [])), + ?assertEqual(ok, couch_server:unlock(DbName)), + {ok, Db} = couch_db:create(DbName, []), + ?assertEqual({error, already_opened}, + couch_server:lock(DbName, <<>>)), + + ok = couch_db:close(Db), + catch exit(couch_db:get_pid(Db), kill), + test_util:wait(fun() -> + case ets:lookup(couch_dbs, DbName) of + [] -> ok; + [_ | _] -> wait + end + end), + + ?assertEqual(ok, couch_server:lock(DbName, <<"y">>)), + ?assertEqual({error, {locked, <<"y">>}}, + couch_db:open(DbName, [])), + + couch_server:unlock(DbName), + {ok, Db1} = couch_db:open(DbName, [{create_if_missing, true}]), + ok = couch_db:close(Db1) + end). create_db(DbName) -> create_db(DbName, []). diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index ab6202115..1e89755a9 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -72,7 +72,8 @@ mem3_reshard_db_test_() -> fun couch_events_are_emitted/1, fun retries_work/1, fun target_reset_in_initial_copy/1, - fun split_an_incomplete_shard_map/1 + fun split_an_incomplete_shard_map/1, + fun target_shards_are_locked/1 ] } } @@ -479,6 +480,38 @@ split_an_incomplete_shard_map(#{db1 := Db}) -> end)}. +% Opening a db target db in initial copy phase will throw an error +target_shards_are_locked(#{db1 := Db}) -> + {timeout, ?TIMEOUT, ?_test(begin + add_test_docs(Db, #{docs => 10}), + + % Make the job stops right when it was about to copy the docs + TestPid = self(), + meck:new(couch_db, [passthrough]), + meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) -> + TestPid ! {start_link, self(), TName}, + receive + continue -> + meck:passthrough([Engine, TName, FilePath, Opts]) + end + end), + + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + {Target0, JobPid} = receive + {start_link, Pid, TName} -> {TName, Pid} + end, + ?assertEqual({error, {locked, <<"shard splitting">>}}, + couch_db:open_int(Target0, [])), + + % Send two continues for two targets + JobPid ! continue, + JobPid ! continue, + + wait_state(JobId, completed) + end)}. + + intercept_state(State) -> TestPid = self(), meck:new(mem3_reshard_job, [passthrough]), |