summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-01-04 17:08:05 -0500
committerNick Vatamaniuc <vatamane@apache.org>2020-01-04 17:15:26 -0500
commit4ed4e5dc239d9f4a1c86838c115d0527ed3dc9d0 (patch)
tree91b01e0e0cc36903a1e48108d496eef46a4d713b
parent9dedf6ad35b05536e8c413b5d3bc2737d1b81cf7 (diff)
downloadcouchdb-add-locking-for-shard-splitting-targets.tar.gz
Lock shard splitting targets during the initial copy phaseadd-locking-for-shard-splitting-targets
During the initial copy phase target shards are opened outside the couch_server. Previously, it was possible to manually (via remsh for instance) open the same targets via the couch_server by using the `couch_db:open/2` API for example. That could lead to corruption as there would be two writers for the same DB file. In order to prevent such a scenario, introduce a mechanism for the shard splitter to lock the target shards such that any regular open call would fail during the initial copy phase. The locking mechanism is generic and would allow local locking of shards for possibly other reasons in the future as well.
-rw-r--r--src/couch/src/couch_db_split.erl11
-rw-r--r--src/couch/src/couch_server.erl39
-rw-r--r--src/couch/test/eunit/couch_db_split_tests.erl35
-rw-r--r--src/couch/test/eunit/couch_db_tests.erl29
-rw-r--r--src/mem3/test/eunit/mem3_reshard_test.erl35
5 files changed, 141 insertions, 8 deletions
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index 5bf98b6fd..3a1f98d3e 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -132,6 +132,12 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
{error, E} ->
throw({target_create_error, DbName, E, Map})
end,
+ case couch_server:lock(DbName, <<"shard splitting">>) of
+ ok ->
+ ok;
+ {error, Err} ->
+ throw({target_create_error, DbName, Err, Map})
+ end,
{ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
Opts = [create, ?ADMIN_CTX] ++ case Partitioned of
true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}];
@@ -164,7 +170,9 @@ split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
cleanup_targets(#{} = Targets, Engine) ->
maps:map(fun(_, #target{db = Db} = T) ->
ok = stop_target_db(Db),
- delete_target(couch_db:name(Db), Engine),
+ DbName = couch_db:name(Db),
+ delete_target(DbName, Engine),
+ couch_server:unlock(DbName),
T
end, Targets).
@@ -182,6 +190,7 @@ stop_target_db(Db) ->
Pid = couch_db:get_pid(Db),
catch unlink(Pid),
catch exit(Pid, kill),
+ couch_server:unlock(couch_db:name(Db)),
ok.
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index ab0122eec..eaca3ee9b 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -26,6 +26,7 @@
-export([exists/1]).
-export([get_engine_extensions/0]).
-export([get_engine_path/2]).
+-export([lock/2, unlock/1]).
% config_listener api
-export([handle_config_change/5, handle_config_terminate/3]).
@@ -77,8 +78,15 @@ get_stats() ->
sup_start_link() ->
gen_server:start_link({local, couch_server}, couch_server, [], []).
+open(DbName, Options) ->
+ case ets:lookup(couch_dbs_locks, DbName) of
+ [] ->
+ open_int(DbName, Options);
+ [{DbName, Reason}] ->
+ {error, {locked, Reason}}
+ end.
-open(DbName, Options0) ->
+open_int(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
case ets:lookup(couch_dbs, DbName) of
[#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
@@ -115,7 +123,15 @@ update_lru(DbName, Options) ->
close_lru() ->
gen_server:call(couch_server, close_lru).
-create(DbName, Options0) ->
+create(DbName, Options) ->
+ case ets:lookup(couch_dbs_locks, DbName) of
+ [] ->
+ create_int(DbName, Options);
+ [{DbName, Reason}] ->
+ {error, {locked, Reason}}
+ end.
+
+create_int(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
couch_partition:validate_dbname(DbName, Options),
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
@@ -251,6 +267,12 @@ init([]) ->
{read_concurrency, true}
]),
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
+ ets:new(couch_dbs_locks, [
+ set,
+ public,
+ named_table,
+ {read_concurrency, true}
+ ]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
engines = Engines,
@@ -774,6 +796,19 @@ get_engine_path(DbName, Engine) when is_binary(DbName), is_atom(Engine) ->
{error, {invalid_engine, Engine}}
end.
+lock(DbName, Reason) when is_binary(DbName), is_binary(Reason) ->
+ case ets:lookup(couch_dbs, DbName) of
+ [] ->
+ true = ets:insert(couch_dbs_locks, {DbName, Reason}),
+ ok;
+ [#entry{}] ->
+ {error, already_opened}
+ end.
+
+unlock(DbName) when is_binary(DbName) ->
+ true = ets:delete(couch_dbs_locks, DbName),
+ ok.
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/couch/test/eunit/couch_db_split_tests.erl b/src/couch/test/eunit/couch_db_split_tests.erl
index 8e64c39ee..6e24c36ee 100644
--- a/src/couch/test/eunit/couch_db_split_tests.erl
+++ b/src/couch/test/eunit/couch_db_split_tests.erl
@@ -56,7 +56,8 @@ split_test_() ->
fun should_fail_on_missing_source/1,
fun should_fail_on_existing_target/1,
fun should_fail_on_invalid_target_name/1,
- fun should_crash_on_invalid_tmap/1
+ fun should_crash_on_invalid_tmap/1,
+ fun should_fail_on_opened_target/1
]
}
]
@@ -104,9 +105,23 @@ should_fail_on_missing_source(_DbName) ->
should_fail_on_existing_target(DbName) ->
Ranges = make_ranges(2),
- TMap = maps:map(fun(_, _) -> DbName end, make_targets(Ranges)),
+ TMap = maps:map(fun(_, TName) ->
+ % We create the target but make sure to remove it from the cache so we
+ % hit the eexist error instaed of already_opened
+ {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]),
+ Pid = couch_db:get_pid(Db),
+ ok = couch_db:close(Db),
+ exit(Pid, kill),
+ test_util:wait(fun() ->
+ case ets:lookup(couch_dbs, TName) of
+ [] -> ok;
+ [_ | _] -> wait
+ end
+ end),
+ TName
+ end, make_targets(Ranges)),
Response = couch_db_split:split(DbName, TMap, fun fake_pickfun/3),
- ?_assertMatch({error, {target_create_error, DbName, eexist}}, Response).
+ ?_assertMatch({error, {target_create_error, _, eexist}}, Response).
should_fail_on_invalid_target_name(DbName) ->
@@ -127,6 +142,20 @@ should_crash_on_invalid_tmap(DbName) ->
couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
+should_fail_on_opened_target(DbName) ->
+ Ranges = make_ranges(2),
+ TMap = maps:map(fun(_, TName) ->
+ % We create and keep the target open but delete
+ % its file on disk so we don't fail with eexist
+ {ok, Db} = couch_db:create(TName, [?ADMIN_CTX]),
+ FilePath = couch_db:get_filepath(Db),
+ ok = file:delete(FilePath),
+ TName
+ end, make_targets(Ranges)),
+ ?_assertMatch({error, {target_create_error, _, already_opened}},
+ couch_db_split:split(DbName, TMap, fun fake_pickfun/3)).
+
+
copy_local_docs_test_() ->
Cases = [
{"Should work with no docs", 0, 2},
diff --git a/src/couch/test/eunit/couch_db_tests.erl b/src/couch/test/eunit/couch_db_tests.erl
index d64f7c640..dd2cb427d 100644
--- a/src/couch/test/eunit/couch_db_tests.erl
+++ b/src/couch/test/eunit/couch_db_tests.erl
@@ -80,7 +80,8 @@ open_db_test_()->
fun() -> ?tempdb() end,
[
fun should_create_db_if_missing/1,
- fun should_open_db_if_exists/1
+ fun should_open_db_if_exists/1,
+ fun locking_should_work/1
]
}
}
@@ -157,6 +158,32 @@ should_open_db_if_exists(DbName) ->
?assert(lists:member(DbName, After))
end).
+locking_should_work(DbName) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_server:lock(DbName, <<"x">>)),
+ ?assertEqual({error, {locked, <<"x">>}}, couch_db:create(DbName, [])),
+ ?assertEqual(ok, couch_server:unlock(DbName)),
+ {ok, Db} = couch_db:create(DbName, []),
+ ?assertEqual({error, already_opened},
+ couch_server:lock(DbName, <<>>)),
+
+ ok = couch_db:close(Db),
+ catch exit(couch_db:get_pid(Db), kill),
+ test_util:wait(fun() ->
+ case ets:lookup(couch_dbs, DbName) of
+ [] -> ok;
+ [_ | _] -> wait
+ end
+ end),
+
+ ?assertEqual(ok, couch_server:lock(DbName, <<"y">>)),
+ ?assertEqual({error, {locked, <<"y">>}},
+ couch_db:open(DbName, [])),
+
+ couch_server:unlock(DbName),
+ {ok, Db1} = couch_db:open(DbName, [{create_if_missing, true}]),
+ ok = couch_db:close(Db1)
+ end).
create_db(DbName) ->
create_db(DbName, []).
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index ab6202115..1e89755a9 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -72,7 +72,8 @@ mem3_reshard_db_test_() ->
fun couch_events_are_emitted/1,
fun retries_work/1,
fun target_reset_in_initial_copy/1,
- fun split_an_incomplete_shard_map/1
+ fun split_an_incomplete_shard_map/1,
+ fun target_shards_are_locked/1
]
}
}
@@ -479,6 +480,38 @@ split_an_incomplete_shard_map(#{db1 := Db}) ->
end)}.
+% Opening a db target db in initial copy phase will throw an error
+target_shards_are_locked(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ add_test_docs(Db, #{docs => 10}),
+
+ % Make the job stops right when it was about to copy the docs
+ TestPid = self(),
+ meck:new(couch_db, [passthrough]),
+ meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) ->
+ TestPid ! {start_link, self(), TName},
+ receive
+ continue ->
+ meck:passthrough([Engine, TName, FilePath, Opts])
+ end
+ end),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ {Target0, JobPid} = receive
+ {start_link, Pid, TName} -> {TName, Pid}
+ end,
+ ?assertEqual({error, {locked, <<"shard splitting">>}},
+ couch_db:open_int(Target0, [])),
+
+ % Send two continues for two targets
+ JobPid ! continue,
+ JobPid ! continue,
+
+ wait_state(JobId, completed)
+ end)}.
+
+
intercept_state(State) ->
TestPid = self(),
meck:new(mem3_reshard_job, [passthrough]),