diff options
-rw-r--r-- | src/couch/src/couch_db_split.erl | 11 | ||||
-rw-r--r-- | src/couch/src/couch_server.erl | 39 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_split_tests.erl | 35 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_db_tests.erl | 29 | ||||
-rw-r--r-- | src/mem3/test/eunit/mem3_reshard_test.erl | 35 |
5 files changed, 141 insertions, 8 deletions
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl index 5bf98b6fd..3a1f98d3e 100644 --- a/src/couch/src/couch_db_split.erl +++ b/src/couch/src/couch_db_split.erl @@ -132,6 +132,12 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) -> {error, E} -> throw({target_create_error, DbName, E, Map}) end, + case couch_server:lock(DbName, <<"shard splitting">>) of + ok -> + ok; + {error, Err} -> + throw({target_create_error, DbName, Err, Map}) + end, {ok, Filepath} = couch_server:get_engine_path(DbName, Engine), Opts = [create, ?ADMIN_CTX] ++ case Partitioned of true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}]; @@ -164,7 +170,9 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) -> cleanup_targets(#{} = Targets, Engine) -> maps:map(fun(_, #target{db = Db} = T) -> ok = stop_target_db(Db), - delete_target(couch_db:name(Db), Engine), + DbName = couch_db:name(Db), + delete_target(DbName, Engine), + couch_server:unlock(DbName), T end, Targets). @@ -182,6 +190,7 @@ stop_target_db(Db) -> Pid = couch_db:get_pid(Db), catch unlink(Pid), catch exit(Pid, kill), + couch_server:unlock(couch_db:name(Db)), ok. diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index ab0122eec..eaca3ee9b 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -26,6 +26,7 @@ -export([exists/1]). -export([get_engine_extensions/0]). -export([get_engine_path/2]). +-export([lock/2, unlock/1]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -77,8 +78,15 @@ get_stats() -> sup_start_link() -> gen_server:start_link({local, couch_server}, couch_server, [], []). +open(DbName, Options) -> + case ets:lookup(couch_dbs_locks, DbName) of + [] -> + open_int(DbName, Options); + [{DbName, Reason}] -> + {error, {locked, Reason}} + end. -open(DbName, Options0) -> +open_int(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), case ets:lookup(couch_dbs, DbName) of [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> @@ -115,7 +123,15 @@ update_lru(DbName, Options) -> close_lru() -> gen_server:call(couch_server, close_lru). -create(DbName, Options0) -> +create(DbName, Options) -> + case ets:lookup(couch_dbs_locks, DbName) of + [] -> + create_int(DbName, Options); + [{DbName, Reason}] -> + {error, {locked, Reason}} + end. + +create_int(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), couch_partition:validate_dbname(DbName, Options), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of @@ -251,6 +267,12 @@ init([]) -> {read_concurrency, true} ]), ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), + ets:new(couch_dbs_locks, [ + set, + public, + named_table, + {read_concurrency, true} + ]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, engines = Engines, @@ -774,6 +796,19 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) -> {error, {invalid_engine, Engine}} end. +lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) -> + case ets:lookup(couch_dbs, DbName) of + [] -> + true = ets:insert(couch_dbs_locks, {DbName, Reason}), + ok; + [#entry{}] -> + {error, already_opened} + end. + +unlock(DbName) when is_binary(DbName) -> + true = ets:delete(couch_dbs_locks, DbName), + ok. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl index 8e64c39ee..6e24c36ee 100644 --- a/src/couch/test/eunit/couch_db_split_tests.erl +++ b/src/couch/test/eunit/couch_db_split_tests.erl @@ -56,7 +56,8 @@ split_test_() -> fun should_fail_on_missing_source/1, fun should_fail_on_existing_target/1, fun should_fail_on_invalid_target_name/1, - fun should_crash_on_invalid_tmap/1 + fun should_crash_on_invalid_tmap/1, + fun should_fail_on_opened_target/1 ] } ] @@ -104,9 +105,23 @@ should_fail_on_missing_source(_DbName) -> should_fail_on_existing_target(DbName) -> Ranges = make_ranges(2), - TMap = maps:map(fun(_, _) -> DbName end, make_targets(Ranges)), + TMap = maps:map(fun(_, TName) -> + % We create the target but make sure to remove it from the cache so we + % hit the eexist error instaed of already_opened + {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]), + Pid = couch_db:get_pid(Db), + ok = couch_db:close(Db), + exit(Pid, kill), + test_util:wait(fun() -> + case ets:lookup(couch_dbs, TName) of + [] -> ok; + [_ | _] -> wait + end + end), + TName + end, make_targets(Ranges)), Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3), - ?_assertMatch({error, {target_create_error, DbName, eexist}}, Response). + ?_assertMatch({error, {target_create_error, _, eexist}}, Response). should_fail_on_invalid_target_name(DbName) -> @@ -127,6 +142,20 @@ should_crash_on_invalid_tmap(DbName) -> couch_db_split:split(DbName, TMap, fun fake_pickfun/3)). +should_fail_on_opened_target(DbName) -> + Ranges = make_ranges(2), + TMap = maps:map(fun(_, TName) -> + % We create and keep the target open but delete + % its file on disk so we don't fail with eexist + {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]), + FilePath = couch_db:get_filepath(Db), + ok = file:delete(FilePath), + TName + end, make_targets(Ranges)), + ?_assertMatch({error, {target_create_error, _, already_opened}}, + couch_db_split:split(DbName, TMap, fun fake_pickfun/3)). + + copy_local_docs_test_() -> Cases = [ {"Should work with no docs", 0, 2}, diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl index d64f7c640..dd2cb427d 100644 --- a/src/couch/test/eunit/couch_db_tests.erl +++ b/src/couch/test/eunit/couch_db_tests.erl @@ -80,7 +80,8 @@ open_db_test_()-> fun() -> ?tempdb() end, [ fun should_create_db_if_missing/1, - fun should_open_db_if_exists/1 + fun should_open_db_if_exists/1, + fun locking_should_work/1 ] } } @@ -157,6 +158,32 @@ should_open_db_if_exists(DbName) -> ?assert(lists:member(DbName, After)) end). +locking_should_work(DbName) -> + ?_test(begin + ?assertEqual(ok, couch_server:lock(DbName, <<"x">>)), + ?assertEqual({error, {locked, <<"x">>}}, couch_db:create(DbName, [])), + ?assertEqual(ok, couch_server:unlock(DbName)), + {ok, Db} = couch_db:create(DbName, []), + ?assertEqual({error, already_opened}, + couch_server:lock(DbName, <<>>)), + + ok = couch_db:close(Db), + catch exit(couch_db:get_pid(Db), kill), + test_util:wait(fun() -> + case ets:lookup(couch_dbs, DbName) of + [] -> ok; + [_ | _] -> wait + end + end), + + ?assertEqual(ok, couch_server:lock(DbName, <<"y">>)), + ?assertEqual({error, {locked, <<"y">>}}, + couch_db:open(DbName, [])), + + couch_server:unlock(DbName), + {ok, Db1} = couch_db:open(DbName, [{create_if_missing, true}]), + ok = couch_db:close(Db1) + end). create_db(DbName) -> create_db(DbName, []). diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index ab6202115..1e89755a9 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -72,7 +72,8 @@ mem3_reshard_db_test_() -> fun couch_events_are_emitted/1, fun retries_work/1, fun target_reset_in_initial_copy/1, - fun split_an_incomplete_shard_map/1 + fun split_an_incomplete_shard_map/1, + fun target_shards_are_locked/1 ] } } @@ -479,6 +480,38 @@ split_an_incomplete_shard_map(#{db1 := Db}) -> end)}. +% Opening a db target db in initial copy phase will throw an error +target_shards_are_locked(#{db1 := Db}) -> + {timeout, ?TIMEOUT, ?_test(begin + add_test_docs(Db, #{docs => 10}), + + % Make the job stops right when it was about to copy the docs + TestPid = self(), + meck:new(couch_db, [passthrough]), + meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) -> + TestPid ! {start_link, self(), TName}, + receive + continue -> + meck:passthrough([Engine, TName, FilePath, Opts]) + end + end), + + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + {Target0, JobPid} = receive + {start_link, Pid, TName} -> {TName, Pid} + end, + ?assertEqual({error, {locked, <<"shard splitting">>}}, + couch_db:open_int(Target0, [])), + + % Send two continues for two targets + JobPid ! continue, + JobPid ! continue, + + wait_state(JobId, completed) + end)}. + + intercept_state(State) -> TestPid = self(), meck:new(mem3_reshard_job, [passthrough]), |