diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-04-14 12:18:49 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-04-21 10:46:07 -0500 |
commit | b71677f593fb62e590f37c2cbb5bb851bd12c11c (patch) | |
tree | 9971d2d35799528ef9de76405d18dd335628111f | |
parent | e4c3705def6021a6b801c0bc0ceaac4abbc7c0d8 (diff) | |
download | couchdb-b71677f593fb62e590f37c2cbb5bb851bd12c11c.tar.gz |
Use a temporary process when caching shard maps
This change introduces a new shard_writer process into the mem3_shards
caching approach. It turns out its not terribly difficult to create
thundering herd scenarios that back up the mem3_shards mailbox. And if
the Q value is large this backup can happen quite quickly.
This changes things so that we use a temporary process to perform the
actual `ets:insert/2` call which keeps the shard map out of mem3_shards'
message queue.
A second optimization is that only a single client will attempt to send
the shard map to begin with by checking the existence of the writer key
using `ets:insert_new/2`.
COUCHDB-3376
-rw-r--r-- | src/mem3/src/mem3_shards.erl | 96 |
1 files changed, 83 insertions, 13 deletions
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index ca5deaf45..bbdc3b534 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -28,7 +28,8 @@ max_size = 25000, cur_size = 0, changes_pid, - update_seq + update_seq, + write_timeout }). -include_lib("mem3/include/mem3.hrl"). @@ -37,6 +38,7 @@ -define(DBS, mem3_dbs). -define(SHARDS, mem3_shards). -define(ATIMES, mem3_atimes). +-define(OPENERS, mem3_openers). -define(RELISTEN_DELAY, 5000). start_link() -> @@ -172,6 +174,13 @@ handle_config_change("mem3", "shard_cache_size", SizeList, _, _) -> {ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)}; handle_config_change("mem3", "shards_db", _DbName, _, _) -> {ok, gen_server:call(?MODULE, shard_db_changed, infinity)}; +handle_config_change("mem3", "shard_write_timeout", Timeout, _, _) -> + Timeout = try + list_to_integer(Timeout) + catch _:_ -> + 1000 + end, + {ok, gen_server:call(?MODULE, {set_write_timeout, Timeout})}; handle_config_change(_, _, _, _, _) -> {ok, nil}. @@ -183,21 +192,24 @@ handle_config_terminate(_Server, _Reason, _State) -> init([]) -> ets:new(?SHARDS, [ bag, - protected, + public, named_table, {keypos,#shard.dbname}, {read_concurrency, true} ]), ets:new(?DBS, [set, protected, named_table]), ets:new(?ATIMES, [ordered_set, protected, named_table]), + ets:new(?OPENERS, [bag, public, named_table]), ok = config:listen_for_changes(?MODULE, nil), SizeList = config:get("mem3", "shard_cache_size", "25000"), + WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000), UpdateSeq = get_update_seq(), {ok, #st{ max_size = list_to_integer(SizeList), cur_size = 0, changes_pid = start_changes_listener(UpdateSeq), - update_seq = UpdateSeq + update_seq = UpdateSeq, + write_timeout = WriteTimeout }}. handle_call({set_max_size, Size}, _From, St) -> @@ -205,6 +217,8 @@ handle_call({set_max_size, Size}, _From, St) -> handle_call(shard_db_changed, _From, St) -> exit(St#st.changes_pid, shard_db_changed), {reply, ok, St}; +handle_call({set_write_timeout, Timeout}, _From, St) -> + {reply, ok, St#st{write_timeout = Timeout}}; handle_call(_Call, _From, St) -> {noreply, St}. @@ -212,23 +226,25 @@ handle_cast({cache_hit, DbName}, St) -> couch_stats:increment_counter([mem3, shard_cache, hit]), cache_hit(DbName), {noreply, St}; -handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) -> - couch_stats:increment_counter([mem3, shard_cache, miss]), +handle_cast({cache_insert, DbName, Writer, UpdateSeq}, St) -> % This comparison correctly uses the `<` operator % and not `=<`. The easiest way to understand why is % to think of when a _dbs db doesn't change. If it used % `=<` it would be impossible to insert anything into % the cache. NewSt = case UpdateSeq < St#st.update_seq of - true -> St; - false -> cache_free(cache_insert(St, DbName, Shards)) + true -> + Writer ! cancel, + St; + false -> + cache_free(cache_insert(St, DbName, Writer, St#st.write_timeout)) end, {noreply, NewSt}; handle_cast({cache_remove, DbName}, St) -> couch_stats:increment_counter([mem3, shard_cache, eviction]), {noreply, cache_remove(St, DbName)}; -handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) -> - Msg = {cache_insert, DbName, Shards, UpdateSeq}, +handle_cast({cache_insert_change, DbName, Writer, UpdateSeq}, St) -> + Msg = {cache_insert, DbName, Writer, UpdateSeq}, {noreply, NewSt} = handle_cast(Msg, St), {noreply, NewSt#st{update_seq = UpdateSeq}}; handle_cast({cache_remove_change, DbName, UpdateSeq}, St) -> @@ -333,7 +349,11 @@ changes_callback({change, {Change}, _}, _) -> [DbName, Reason]); {Doc} -> Shards = mem3_util:build_ordered_shards(DbName, Doc), - Msg = {cache_insert_change, DbName, Shards, Seq}, + IdleTimeout = config:get_integer( + "mem3", "writer_idle_timeout", 30000), + Writer = spawn_shard_writer(DbName, Shards, IdleTimeout), + ets:insert(?OPENERS, {DbName, Writer}), + Msg = {cache_insert_change, DbName, Writer, Seq}, gen_server:cast(?MODULE, Msg), [create_if_missing(mem3:name(S)) || S <- Shards, mem3:node(S) =:= node()] @@ -345,6 +365,7 @@ changes_callback(timeout, _) -> ok. load_shards_from_disk(DbName) when is_binary(DbName) -> + couch_stats:increment_counter([mem3, shard_cache, miss]), X = ?l2b(config:get("mem3", "shards_db", "_dbs")), {ok, Db} = mem3_util:ensure_exists(X), try @@ -358,7 +379,19 @@ load_shards_from_db(#db{} = ShardDb, DbName) -> {ok, #doc{body = {Props}}} -> Seq = couch_db:get_update_seq(ShardDb), Shards = mem3_util:build_ordered_shards(DbName, Props), - gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}), + IdleTimeout = config:get_integer("mem3", "writer_idle_timeout", 30000), + case maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) of + Writer when is_pid(Writer) -> + case ets:insert_new(?OPENERS, {DbName, Writer}) of + true -> + Msg = {cache_insert, DbName, Writer, Seq}, + gen_server:cast(?MODULE, Msg); + false -> + Writer ! cancel + end; + ignore -> + ok + end, Shards; {not_found, _} -> erlang:error(database_does_not_exist, ?b2l(DbName)) @@ -389,10 +422,10 @@ create_if_missing(Name) -> end end. -cache_insert(#st{cur_size=Cur}=St, DbName, Shards) -> +cache_insert(#st{cur_size=Cur}=St, DbName, Writer, Timeout) -> NewATime = now(), true = ets:delete(?SHARDS, DbName), - true = ets:insert(?SHARDS, Shards), + flush_write(DbName, Writer, Timeout), case ets:lookup(?DBS, DbName) of [{DbName, ATime}] -> true = ets:delete(?ATIMES, ATime), @@ -443,6 +476,43 @@ cache_clear(St) -> true = ets:delete_all_objects(?ATIMES), St#st{cur_size=0}. +maybe_spawn_shard_writer(DbName, Shards) -> + case ets:member(?OPENERS, DbName) of + true -> + ignore; + false -> + spawn_shard_writer(DbName, Shards) + end. + +spawn_shard_writer(DbName, Shards) -> + erlang:spawn(fun() -> shard_writer(DbName, Shards) end). + +shard_writer(DbName, Shards) -> + try + receive + write -> + true = ets:insert(?SHARDS, Shards); + cancel -> + ok + after ?WRITE_IDLE_TIMEOUT -> + ok + end + after + true = ets:delete_object(?OPENERS, {DbName, self()}) + end. + +flush_write(DbName, Writer) -> + Ref = erlang:monitor(process, Writer), + Writer ! write, + receive + {'DOWN', Ref, _, _, normal} -> + ok; + {'DOWN', Ref, _, _, Error} -> + erlang:exit({mem3_shards_bad_write, Error}) + after ?WRITE_TIMEOUT -> + erlang:exit({mem3_shards_write_timeout, DbName}) + end. + filter_shards_by_name(Name, Shards) -> filter_shards_by_name(Name, [], Shards). |