diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2017-04-18 01:14:20 -0400 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-04-24 14:54:00 -0500 |
commit | c1c6891aafac548314c8eb610b8e63f1997b107c (patch) | |
tree | c8dbedf6fe2ba232dcc17057acf2a453f360d990 | |
parent | b71677f593fb62e590f37c2cbb5bb851bd12c11c (diff) | |
download | couchdb-c1c6891aafac548314c8eb610b8e63f1997b107c.tar.gz |
Add unit tests for mem3_shardsCOUCHDB-3376-fix-mem3-shards
COUCHDB-3376
-rw-r--r-- | src/mem3/src/mem3_shards.erl | 219 |
1 files changed, 211 insertions, 8 deletions
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index bbdc3b534..3c2001b1b 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -476,32 +476,32 @@ cache_clear(St) -> true = ets:delete_all_objects(?ATIMES), St#st{cur_size=0}. -maybe_spawn_shard_writer(DbName, Shards) -> +maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) -> case ets:member(?OPENERS, DbName) of true -> ignore; false -> - spawn_shard_writer(DbName, Shards) + spawn_shard_writer(DbName, Shards, IdleTimeout) end. -spawn_shard_writer(DbName, Shards) -> - erlang:spawn(fun() -> shard_writer(DbName, Shards) end). +spawn_shard_writer(DbName, Shards, IdleTimeout) -> + erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end). -shard_writer(DbName, Shards) -> +shard_writer(DbName, Shards, IdleTimeout) -> try receive write -> true = ets:insert(?SHARDS, Shards); cancel -> ok - after ?WRITE_IDLE_TIMEOUT -> + after IdleTimeout -> ok end after true = ets:delete_object(?OPENERS, {DbName, self()}) end. -flush_write(DbName, Writer) -> +flush_write(DbName, Writer, WriteTimeout) -> Ref = erlang:monitor(process, Writer), Writer ! write, receive @@ -509,7 +509,7 @@ flush_write(DbName, Writer) -> ok; {'DOWN', Ref, _, _, Error} -> erlang:exit({mem3_shards_bad_write, Error}) - after ?WRITE_TIMEOUT -> + after WriteTimeout -> erlang:exit({mem3_shards_write_timeout, DbName}) end. @@ -524,3 +524,206 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) -> filter_shards_by_name(Name, [S|Matches], Ss); filter_shards_by_name(Name, Matches, [_|Ss]) -> filter_shards_by_name(Name, Matches, Ss). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(DB, <<"eunit_db_name">>). +-define(INFINITY, 99999999). + + +mem3_shards_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_maybe_spawn_shard_writer_already_exists(), + t_maybe_spawn_shard_writer_new(), + t_flush_writer_exists_normal(), + t_flush_writer_times_out(), + t_flush_writer_crashes(), + t_writer_deletes_itself_when_done(), + t_writer_does_not_delete_other_writers_for_same_shard(), + t_spawn_writer_in_load_shards_from_db(), + t_cache_insert_takes_new_update(), + t_cache_insert_ignores_stale_update_and_kills_worker() + ] + }. + + +setup() -> + ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]), + ets:new(?OPENERS, [bag, public, named_table]), + ets:new(?DBS, [set, public, named_table]), + ets:new(?ATIMES, [ordered_set, public, named_table]), + meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"), + ok. + + +teardown(_) -> + meck:unload(), + ets:delete(?ATIMES), + ets:delete(?DBS), + ets:delete(?OPENERS), + ets:delete(?SHARDS). + + +t_maybe_spawn_shard_writer_already_exists() -> + ?_test(begin + ets:insert(?OPENERS, {?DB, self()}), + Shards = mock_shards(), + WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY), + ?assertEqual(ignore, WRes) + end). + + +t_maybe_spawn_shard_writer_new() -> + ?_test(begin + Shards = mock_shards(), + WPid = maybe_spawn_shard_writer(?DB, Shards, 1000), + WRef = erlang:monitor(process, WPid), + ?assert(is_pid(WPid)), + ?assert(is_process_alive(WPid)), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)) + end). + + +t_flush_writer_exists_normal() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)) + end). + + +t_flush_writer_times_out() -> + ?_test(begin + WPid = spawn(fun() -> receive will_never_receive_this -> ok end end), + Error = {mem3_shards_write_timeout, ?DB}, + ?assertExit(Error, flush_write(?DB, WPid, 100)), + exit(WPid, kill) + end). + + +t_flush_writer_crashes() -> + ?_test(begin + WPid = spawn(fun() -> receive write -> exit('kapow!') end end), + Error = {mem3_shards_bad_write, 'kapow!'}, + ?assertExit(Error, flush_write(?DB, WPid, 1000)) + end). + + +t_writer_deletes_itself_when_done() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + ets:insert(?OPENERS, {?DB, WPid}), + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPENERS)) + end). + + +t_writer_does_not_delete_other_writers_for_same_shard() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + ets:insert(?OPENERS, {?DB, WPid}), + ets:insert(?OPENERS, {?DB, self()}), % should not be deleted + WPid ! write, + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual(1, ets:info(?OPENERS, size)), + ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS)) + end). + + +t_spawn_writer_in_load_shards_from_db() -> + ?_test(begin + meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}), + meck:expect(couch_db, get_update_seq, 1, 1), + meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), + erlang:register(?MODULE, self()), % register to get cache_insert cast + load_shards_from_db(#db{name = <<"testdb">>}, ?DB), + meck:validate(couch_db), + meck:validate(mem3_util), + Cast = receive + {'$gen_cast', Msg} -> Msg + after 1000 -> + timeout + end, + ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast), + {cache_insert, _, WPid, _} = Cast, + exit(WPid, kill), + ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS)) + end). + + +t_cache_insert_takes_new_update() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + Msg = {cache_insert, ?DB, WPid, 2}, + {noreply, NewState} = handle_cast(Msg, mock_state(1)), + ?assertMatch(#st{cur_size = 1}, NewState), + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPENERS)) + end). + + +t_cache_insert_ignores_stale_update_and_kills_worker() -> + ?_test(begin + Shards = mock_shards(), + WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WRef = erlang:monitor(process, WPid), + Msg = {cache_insert, ?DB, WPid, 1}, + {noreply, NewState} = handle_cast(Msg, mock_state(2)), + ?assertEqual(normal, wait_writer_result(WRef)), + ?assertMatch(#st{cur_size = 0}, NewState), + ?assertEqual([], ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPENERS)) + end). + + +mock_state(UpdateSeq) -> + #st{ + update_seq = UpdateSeq, + changes_pid = self(), + write_timeout = 1000 + }. + + +mock_shards() -> + [ + #ordered_shard{ + name = <<"testshardname">>, + node = node(), + dbname = ?DB, + range = [0,1], + order = 1 + } + ]. + + +wait_writer_result(WRef) -> + receive + {'DOWN', WRef, _, _, Result} -> + Result + after 1000 -> + timeout + end. + + +spawn_link_mock_writer(Db, Shards, Timeout) -> + erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end). + +-endif. |