summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-04-18 01:14:20 -0400
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-04-24 14:54:00 -0500
commitc1c6891aafac548314c8eb610b8e63f1997b107c (patch)
treec8dbedf6fe2ba232dcc17057acf2a453f360d990
parentb71677f593fb62e590f37c2cbb5bb851bd12c11c (diff)
downloadcouchdb-COUCHDB-3376-fix-mem3-shards.tar.gz
Add unit tests for mem3_shardsCOUCHDB-3376-fix-mem3-shards
COUCHDB-3376
-rw-r--r--src/mem3/src/mem3_shards.erl219
1 files changed, 211 insertions, 8 deletions
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index bbdc3b534..3c2001b1b 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -476,32 +476,32 @@ cache_clear(St) ->
true = ets:delete_all_objects(?ATIMES),
St#st{cur_size=0}.
-maybe_spawn_shard_writer(DbName, Shards) ->
+maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) ->
case ets:member(?OPENERS, DbName) of
true ->
ignore;
false ->
- spawn_shard_writer(DbName, Shards)
+ spawn_shard_writer(DbName, Shards, IdleTimeout)
end.
-spawn_shard_writer(DbName, Shards) ->
- erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+spawn_shard_writer(DbName, Shards, IdleTimeout) ->
+ erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end).
-shard_writer(DbName, Shards) ->
+shard_writer(DbName, Shards, IdleTimeout) ->
try
receive
write ->
true = ets:insert(?SHARDS, Shards);
cancel ->
ok
- after ?WRITE_IDLE_TIMEOUT ->
+ after IdleTimeout ->
ok
end
after
true = ets:delete_object(?OPENERS, {DbName, self()})
end.
-flush_write(DbName, Writer) ->
+flush_write(DbName, Writer, WriteTimeout) ->
Ref = erlang:monitor(process, Writer),
Writer ! write,
receive
@@ -509,7 +509,7 @@ flush_write(DbName, Writer) ->
ok;
{'DOWN', Ref, _, _, Error} ->
erlang:exit({mem3_shards_bad_write, Error})
- after ?WRITE_TIMEOUT ->
+ after WriteTimeout ->
erlang:exit({mem3_shards_write_timeout, DbName})
end.
@@ -524,3 +524,206 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
filter_shards_by_name(Name, [S|Matches], Ss);
filter_shards_by_name(Name, Matches, [_|Ss]) ->
filter_shards_by_name(Name, Matches, Ss).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DB, <<"eunit_db_name">>).
+-define(INFINITY, 99999999).
+
+
+mem3_shards_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_maybe_spawn_shard_writer_already_exists(),
+ t_maybe_spawn_shard_writer_new(),
+ t_flush_writer_exists_normal(),
+ t_flush_writer_times_out(),
+ t_flush_writer_crashes(),
+ t_writer_deletes_itself_when_done(),
+ t_writer_does_not_delete_other_writers_for_same_shard(),
+ t_spawn_writer_in_load_shards_from_db(),
+ t_cache_insert_takes_new_update(),
+ t_cache_insert_ignores_stale_update_and_kills_worker()
+ ]
+ }.
+
+
+setup() ->
+ ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]),
+ ets:new(?OPENERS, [bag, public, named_table]),
+ ets:new(?DBS, [set, public, named_table]),
+ ets:new(?ATIMES, [ordered_set, public, named_table]),
+ meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+ ok.
+
+
+teardown(_) ->
+ meck:unload(),
+ ets:delete(?ATIMES),
+ ets:delete(?DBS),
+ ets:delete(?OPENERS),
+ ets:delete(?SHARDS).
+
+
+t_maybe_spawn_shard_writer_already_exists() ->
+ ?_test(begin
+ ets:insert(?OPENERS, {?DB, self()}),
+ Shards = mock_shards(),
+ WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY),
+ ?assertEqual(ignore, WRes)
+ end).
+
+
+t_maybe_spawn_shard_writer_new() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = maybe_spawn_shard_writer(?DB, Shards, 1000),
+ WRef = erlang:monitor(process, WPid),
+ ?assert(is_pid(WPid)),
+ ?assert(is_process_alive(WPid)),
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS))
+ end).
+
+
+t_flush_writer_exists_normal() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS))
+ end).
+
+
+t_flush_writer_times_out() ->
+ ?_test(begin
+ WPid = spawn(fun() -> receive will_never_receive_this -> ok end end),
+ Error = {mem3_shards_write_timeout, ?DB},
+ ?assertExit(Error, flush_write(?DB, WPid, 100)),
+ exit(WPid, kill)
+ end).
+
+
+t_flush_writer_crashes() ->
+ ?_test(begin
+ WPid = spawn(fun() -> receive write -> exit('kapow!') end end),
+ Error = {mem3_shards_bad_write, 'kapow!'},
+ ?assertExit(Error, flush_write(?DB, WPid, 1000))
+ end).
+
+
+t_writer_deletes_itself_when_done() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ ets:insert(?OPENERS, {?DB, WPid}),
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+t_writer_does_not_delete_other_writers_for_same_shard() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ ets:insert(?OPENERS, {?DB, WPid}),
+ ets:insert(?OPENERS, {?DB, self()}), % should not be deleted
+ WPid ! write,
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+ ?assertEqual(1, ets:info(?OPENERS, size)),
+ ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS))
+ end).
+
+
+t_spawn_writer_in_load_shards_from_db() ->
+ ?_test(begin
+ meck:expect(couch_db, open_doc, 3, {ok, #doc{body = {[]}}}),
+ meck:expect(couch_db, get_update_seq, 1, 1),
+ meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
+ erlang:register(?MODULE, self()), % register to get cache_insert cast
+ load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
+ meck:validate(couch_db),
+ meck:validate(mem3_util),
+ Cast = receive
+ {'$gen_cast', Msg} -> Msg
+ after 1000 ->
+ timeout
+ end,
+ ?assertMatch({cache_insert, ?DB, Pid, 1} when is_pid(Pid), Cast),
+ {cache_insert, _, WPid, _} = Cast,
+ exit(WPid, kill),
+ ?assertEqual([{?DB, WPid}], ets:tab2list(?OPENERS))
+ end).
+
+
+t_cache_insert_takes_new_update() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ Msg = {cache_insert, ?DB, WPid, 2},
+ {noreply, NewState} = handle_cast(Msg, mock_state(1)),
+ ?assertMatch(#st{cur_size = 1}, NewState),
+ ?assertEqual(Shards, ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+t_cache_insert_ignores_stale_update_and_kills_worker() ->
+ ?_test(begin
+ Shards = mock_shards(),
+ WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY),
+ WRef = erlang:monitor(process, WPid),
+ Msg = {cache_insert, ?DB, WPid, 1},
+ {noreply, NewState} = handle_cast(Msg, mock_state(2)),
+ ?assertEqual(normal, wait_writer_result(WRef)),
+ ?assertMatch(#st{cur_size = 0}, NewState),
+ ?assertEqual([], ets:tab2list(?SHARDS)),
+ ?assertEqual([], ets:tab2list(?OPENERS))
+ end).
+
+
+mock_state(UpdateSeq) ->
+ #st{
+ update_seq = UpdateSeq,
+ changes_pid = self(),
+ write_timeout = 1000
+ }.
+
+
+mock_shards() ->
+ [
+ #ordered_shard{
+ name = <<"testshardname">>,
+ node = node(),
+ dbname = ?DB,
+ range = [0,1],
+ order = 1
+ }
+ ].
+
+
+wait_writer_result(WRef) ->
+ receive
+ {'DOWN', WRef, _, _, Result} ->
+ Result
+ after 1000 ->
+ timeout
+ end.
+
+
+spawn_link_mock_writer(Db, Shards, Timeout) ->
+ erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end).
+
+-endif.