summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couch/src/couch_db_split.erl11
-rw-r--r--src/couch/src/couch_server.erl39
-rw-r--r--src/couch/test/eunit/couch_db_split_tests.erl35
-rw-r--r--src/couch/test/eunit/couch_db_tests.erl29
-rw-r--r--src/mem3/test/eunit/mem3_reshard_test.erl35
5 files changed, 141 insertions, 8 deletions
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index 5bf98b6fd..3a1f98d3e 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -132,6 +132,12 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
{error, E} ->
throw({target_create_error, DbName, E, Map})
end,
+ case couch_server:lock(DbName, <<"shard splitting">>) of
+ ok ->
+ ok;
+ {error, Err} ->
+ throw({target_create_error, DbName, Err, Map})
+ end,
{ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
Opts = [create, ?ADMIN_CTX] ++ case Partitioned of
true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}];
@@ -164,7 +170,9 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
cleanup_targets(#{} = Targets, Engine) ->
maps:map(fun(_, #target{db = Db} = T) ->
ok = stop_target_db(Db),
- delete_target(couch_db:name(Db), Engine),
+ DbName = couch_db:name(Db),
+ delete_target(DbName, Engine),
+ couch_server:unlock(DbName),
T
end, Targets).
@@ -182,6 +190,7 @@ stop_target_db(Db) ->
Pid = couch_db:get_pid(Db),
catch unlink(Pid),
catch exit(Pid, kill),
+ couch_server:unlock(couch_db:name(Db)),
ok.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index ab0122eec..eaca3ee9b 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -26,6 +26,7 @@
-export([exists/1]).
-export([get_engine_extensions/0]).
-export([get_engine_path/2]).
+-export([lock/2, unlock/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -77,8 +78,15 @@ get_stats() ->
sup_start_link() ->
gen_server:start_link({local, couch_server}, couch_server, [], []).
+open(DbName, Options) ->
+ case ets:lookup(couch_dbs_locks, DbName) of
+ [] ->
+ open_int(DbName, Options);
+ [{DbName, Reason}] ->
+ {error, {locked, Reason}}
+ end.
-open(DbName, Options0) ->
+open_int(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
case ets:lookup(couch_dbs, DbName) of
[#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
@@ -115,7 +123,15 @@ update_lru(DbName, Options) ->
close_lru() ->
gen_server:call(couch_server, close_lru).
-create(DbName, Options0) ->
+create(DbName, Options) ->
+ case ets:lookup(couch_dbs_locks, DbName) of
+ [] ->
+ create_int(DbName, Options);
+ [{DbName, Reason}] ->
+ {error, {locked, Reason}}
+ end.
+
+create_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
couch_partition:validate_dbname(DbName, Options),
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
@@ -251,6 +267,12 @@ init([]) ->
{read_concurrency, true}
]),
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
+ ets:new(couch_dbs_locks, [
+ set,
+ public,
+ named_table,
+ {read_concurrency, true}
+ ]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
engines = Engines,
@@ -774,6 +796,19 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
{error, {invalid_engine, Engine}}
end.
+lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
+ case ets:lookup(couch_dbs, DbName) of
+ [] ->
+ true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+ ok;
+ [#entry{}] ->
+ {error, already_opened}
+ end.
+
+unlock(DbName) when is_binary(DbName) ->
+ true = ets:delete(couch_dbs_locks, DbName),
+ ok.
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl
index 8e64c39ee..6e24c36ee 100644
--- a/src/couch/test/eunit/couch_db_split_tests.erl
+++ b/src/couch/test/eunit/couch_db_split_tests.erl
@@ -56,7 +56,8 @@ split_test_() ->
fun should_fail_on_missing_source/1,
fun should_fail_on_existing_target/1,
fun should_fail_on_invalid_target_name/1,
- fun should_crash_on_invalid_tmap/1
+ fun should_crash_on_invalid_tmap/1,
+ fun should_fail_on_opened_target/1
]
}
]
@@ -104,9 +105,23 @@ should_fail_on_missing_source(_DbName) ->
should_fail_on_existing_target(DbName) ->
Ranges = make_ranges(2),
- TMap = maps:map(fun(_, _) -> DbName end, make_targets(Ranges)),
+ TMap = maps:map(fun(_, TName) ->
+ % We create the target but make sure to remove it from the cache so we
+ % hit the eexist error instaed of already_opened
+ {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]),
+ Pid = couch_db:get_pid(Db),
+ ok = couch_db:close(Db),
+ exit(Pid, kill),
+ test_util:wait(fun() ->
+ case ets:lookup(couch_dbs, TName) of
+ [] -> ok;
+ [_ | _] -> wait
+ end
+ end),
+ TName
+ end, make_targets(Ranges)),
Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3),
- ?_assertMatch({error, {target_create_error, DbName, eexist}}, Response).
+ ?_assertMatch({error, {target_create_error, _, eexist}}, Response).
should_fail_on_invalid_target_name(DbName) ->
@@ -127,6 +142,20 @@ should_crash_on_invalid_tmap(DbName) ->
couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
+should_fail_on_opened_target(DbName) ->
+ Ranges = make_ranges(2),
+ TMap = maps:map(fun(_, TName) ->
+ % We create and keep the target open but delete
+ % its file on disk so we don't fail with eexist
+ {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]),
+ FilePath = couch_db:get_filepath(Db),
+ ok = file:delete(FilePath),
+ TName
+ end, make_targets(Ranges)),
+ ?_assertMatch({error, {target_create_error, _, already_opened}},
+ couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
+
+
copy_local_docs_test_() ->
Cases = [
{"Should work with no docs", 0, 2},
diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl
index d64f7c640..dd2cb427d 100644
--- a/src/couch/test/eunit/couch_db_tests.erl
+++ b/src/couch/test/eunit/couch_db_tests.erl
@@ -80,7 +80,8 @@ open_db_test_()->
fun() -> ?tempdb() end,
[
fun should_create_db_if_missing/1,
- fun should_open_db_if_exists/1
+ fun should_open_db_if_exists/1,
+ fun locking_should_work/1
]
}
}
@@ -157,6 +158,32 @@ should_open_db_if_exists(DbName) ->
?assert(lists:member(DbName, After))
end).
+locking_should_work(DbName) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_server:lock(DbName, <<"x">>)),
+ ?assertEqual({error, {locked, <<"x">>}}, couch_db:create(DbName, [])),
+ ?assertEqual(ok, couch_server:unlock(DbName)),
+ {ok, Db} = couch_db:create(DbName, []),
+ ?assertEqual({error, already_opened},
+ couch_server:lock(DbName, <<>>)),
+
+ ok = couch_db:close(Db),
+ catch exit(couch_db:get_pid(Db), kill),
+ test_util:wait(fun() ->
+ case ets:lookup(couch_dbs, DbName) of
+ [] -> ok;
+ [_ | _] -> wait
+ end
+ end),
+
+ ?assertEqual(ok, couch_server:lock(DbName, <<"y">>)),
+ ?assertEqual({error, {locked, <<"y">>}},
+ couch_db:open(DbName, [])),
+
+ couch_server:unlock(DbName),
+ {ok, Db1} = couch_db:open(DbName, [{create_if_missing, true}]),
+ ok = couch_db:close(Db1)
+ end).
create_db(DbName) ->
create_db(DbName, []).
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index ab6202115..1e89755a9 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -72,7 +72,8 @@ mem3_reshard_db_test_() ->
fun couch_events_are_emitted/1,
fun retries_work/1,
fun target_reset_in_initial_copy/1,
- fun split_an_incomplete_shard_map/1
+ fun split_an_incomplete_shard_map/1,
+ fun target_shards_are_locked/1
]
}
}
@@ -479,6 +480,38 @@ split_an_incomplete_shard_map(#{db1 := Db}) ->
end)}.
+% Opening a db target db in initial copy phase will throw an error
+target_shards_are_locked(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ add_test_docs(Db, #{docs => 10}),
+
+ % Make the job stops right when it was about to copy the docs
+ TestPid = self(),
+ meck:new(couch_db, [passthrough]),
+ meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) ->
+ TestPid ! {start_link, self(), TName},
+ receive
+ continue ->
+ meck:passthrough([Engine, TName, FilePath, Opts])
+ end
+ end),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ {Target0, JobPid} = receive
+ {start_link, Pid, TName} -> {TName, Pid}
+ end,
+ ?assertEqual({error, {locked, <<"shard splitting">>}},
+ couch_db:open_int(Target0, [])),
+
+ % Send two continues for two targets
+ JobPid ! continue,
+ JobPid ! continue,
+
+ wait_state(JobId, completed)
+ end)}.
+
+
intercept_state(State) ->
TestPid = self(),
meck:new(mem3_reshard_job, [passthrough]),