diff options
authorPaul J. Davis <>2017-04-14 12:18:49 -0500
committerPaul J. Davis <>2017-04-21 10:46:07 -0500
commitb71677f593fb62e590f37c2cbb5bb851bd12c11c (patch)
parente4c3705def6021a6b801c0bc0ceaac4abbc7c0d8 (diff)
Use a temporary process when caching shard maps
This change introduces a new shard_writer process into the mem3_shards caching approach. It turns out its not terribly difficult to create thundering herd scenarios that back up the mem3_shards mailbox. And if the Q value is large this backup can happen quite quickly. This changes things so that we use a temporary process to perform the actual `ets:insert/2` call which keeps the shard map out of mem3_shards' message queue. A second optimization is that only a single client will attempt to send the shard map to begin with by checking the existence of the writer key using `ets:insert_new/2`. COUCHDB-3376
1 files changed, 83 insertions, 13 deletions
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index ca5deaf45..bbdc3b534 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -28,7 +28,8 @@
max_size = 25000,
cur_size = 0,
- update_seq
+ update_seq,
+ write_timeout
@@ -37,6 +38,7 @@
-define(DBS, mem3_dbs).
-define(SHARDS, mem3_shards).
-define(ATIMES, mem3_atimes).
+-define(OPENERS, mem3_openers).
-define(RELISTEN_DELAY, 5000).
start_link() ->
@@ -172,6 +174,13 @@ handle_config_change("mem3", "shard_cache_size", SizeList, _, _) ->
{ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)};
handle_config_change("mem3", "shards_db", _DbName, _, _) ->
{ok, gen_server:call(?MODULE, shard_db_changed, infinity)};
+handle_config_change("mem3", "shard_write_timeout", Timeout, _, _) ->
+ Timeout = try
+ list_to_integer(Timeout)
+ catch _:_ ->
+ 1000
+ end,
+ {ok, gen_server:call(?MODULE, {set_write_timeout, Timeout})};
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
@@ -183,21 +192,24 @@ handle_config_terminate(_Server, _Reason, _State) ->
init([]) ->
ets:new(?SHARDS, [
- protected,
+ public,
{read_concurrency, true}
ets:new(?DBS, [set, protected, named_table]),
ets:new(?ATIMES, [ordered_set, protected, named_table]),
+ ets:new(?OPENERS, [bag, public, named_table]),
ok = config:listen_for_changes(?MODULE, nil),
SizeList = config:get("mem3", "shard_cache_size", "25000"),
+ WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000),
UpdateSeq = get_update_seq(),
{ok, #st{
max_size = list_to_integer(SizeList),
cur_size = 0,
changes_pid = start_changes_listener(UpdateSeq),
- update_seq = UpdateSeq
+ update_seq = UpdateSeq,
+ write_timeout = WriteTimeout
handle_call({set_max_size, Size}, _From, St) ->
@@ -205,6 +217,8 @@ handle_call({set_max_size, Size}, _From, St) ->
handle_call(shard_db_changed, _From, St) ->
exit(St#st.changes_pid, shard_db_changed),
{reply, ok, St};
+handle_call({set_write_timeout, Timeout}, _From, St) ->
+ {reply, ok, St#st{write_timeout = Timeout}};
handle_call(_Call, _From, St) ->
{noreply, St}.
@@ -212,23 +226,25 @@ handle_cast({cache_hit, DbName}, St) ->
couch_stats:increment_counter([mem3, shard_cache, hit]),
{noreply, St};
-handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) ->
- couch_stats:increment_counter([mem3, shard_cache, miss]),
+handle_cast({cache_insert, DbName, Writer, UpdateSeq}, St) ->
% This comparison correctly uses the `<` operator
% and not `=<`. The easiest way to understand why is
% to think of when a _dbs db doesn't change. If it used
% `=<` it would be impossible to insert anything into
% the cache.
NewSt = case UpdateSeq < St#st.update_seq of
- true -> St;
- false -> cache_free(cache_insert(St, DbName, Shards))
+ true ->
+ Writer ! cancel,
+ St;
+ false ->
+ cache_free(cache_insert(St, DbName, Writer, St#st.write_timeout))
{noreply, NewSt};
handle_cast({cache_remove, DbName}, St) ->
couch_stats:increment_counter([mem3, shard_cache, eviction]),
{noreply, cache_remove(St, DbName)};
-handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) ->
- Msg = {cache_insert, DbName, Shards, UpdateSeq},
+handle_cast({cache_insert_change, DbName, Writer, UpdateSeq}, St) ->
+ Msg = {cache_insert, DbName, Writer, UpdateSeq},
{noreply, NewSt} = handle_cast(Msg, St),
{noreply, NewSt#st{update_seq = UpdateSeq}};
handle_cast({cache_remove_change, DbName, UpdateSeq}, St) ->
@@ -333,7 +349,11 @@ changes_callback({change, {Change}, _}, _) ->
[DbName, Reason]);
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
- Msg = {cache_insert_change, DbName, Shards, Seq},
+ IdleTimeout = config:get_integer(
+ "mem3", "writer_idle_timeout", 30000),
+ Writer = spawn_shard_writer(DbName, Shards, IdleTimeout),
+ ets:insert(?OPENERS, {DbName, Writer}),
+ Msg = {cache_insert_change, DbName, Writer, Seq},
gen_server:cast(?MODULE, Msg),
[create_if_missing(mem3:name(S)) || S
<- Shards, mem3:node(S) =:= node()]
@@ -345,6 +365,7 @@ changes_callback(timeout, _) ->
load_shards_from_disk(DbName) when is_binary(DbName) ->
+ couch_stats:increment_counter([mem3, shard_cache, miss]),
X = ?l2b(config:get("mem3", "shards_db", "_dbs")),
{ok, Db} = mem3_util:ensure_exists(X),
@@ -358,7 +379,19 @@ load_shards_from_db(#db{} = ShardDb, DbName) ->
{ok, #doc{body = {Props}}} ->
Seq = couch_db:get_update_seq(ShardDb),
Shards = mem3_util:build_ordered_shards(DbName, Props),
- gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}),
+ IdleTimeout = config:get_integer("mem3", "writer_idle_timeout", 30000),
+ case maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) of
+ Writer when is_pid(Writer) ->
+ case ets:insert_new(?OPENERS, {DbName, Writer}) of
+ true ->
+ Msg = {cache_insert, DbName, Writer, Seq},
+ gen_server:cast(?MODULE, Msg);
+ false ->
+ Writer ! cancel
+ end;
+ ignore ->
+ ok
+ end,
{not_found, _} ->
erlang:error(database_does_not_exist, ?b2l(DbName))
@@ -389,10 +422,10 @@ create_if_missing(Name) ->
-cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
+cache_insert(#st{cur_size=Cur}=St, DbName, Writer, Timeout) ->
NewATime = now(),
true = ets:delete(?SHARDS, DbName),
- true = ets:insert(?SHARDS, Shards),
+ flush_write(DbName, Writer, Timeout),
case ets:lookup(?DBS, DbName) of
[{DbName, ATime}] ->
true = ets:delete(?ATIMES, ATime),
@@ -443,6 +476,43 @@ cache_clear(St) ->
true = ets:delete_all_objects(?ATIMES),
+maybe_spawn_shard_writer(DbName, Shards) ->
+ case ets:member(?OPENERS, DbName) of
+ true ->
+ ignore;
+ false ->
+ spawn_shard_writer(DbName, Shards)
+ end.
+spawn_shard_writer(DbName, Shards) ->
+ erlang:spawn(fun() -> shard_writer(DbName, Shards) end).
+shard_writer(DbName, Shards) ->
+ try
+ receive
+ write ->
+ true = ets:insert(?SHARDS, Shards);
+ cancel ->
+ ok
+ ok
+ end
+ after
+ true = ets:delete_object(?OPENERS, {DbName, self()})
+ end.
+flush_write(DbName, Writer) ->
+ Ref = erlang:monitor(process, Writer),
+ Writer ! write,
+ receive
+ {'DOWN', Ref, _, _, normal} ->
+ ok;
+ {'DOWN', Ref, _, _, Error} ->
+ erlang:exit({mem3_shards_bad_write, Error})
+ after ?WRITE_TIMEOUT ->
+ erlang:exit({mem3_shards_write_timeout, DbName})
+ end.
filter_shards_by_name(Name, Shards) ->
filter_shards_by_name(Name, [], Shards).