summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-05-05 03:21:43 -0400
committerNick Vatamaniuc <vatamane@apache.org>2017-05-05 10:24:05 -0400
commit50a738a46701332046a467d070309ef832482656 (patch)
tree000a9ebff963bbc14bc3aeb727e7863de9623dc1
parent4e983fc23c0982e9a9fead510c00f74e3cdbd676 (diff)
downloadcouchdb-50a738a46701332046a467d070309ef832482656.tar.gz
Add jittered sleep during replicator shard scanning
This is bringing back previous code: https://github.com/apache/couchdb/blob/884cf3e55f77ab1a5f26dc7202ce21771062eae6/src/couch_replicator_manager.erl#L940-L946 This is to avoid a stampede during startup when potentially a large number shards are found and change feeds have to be opened for all of them at the same time. The average jitter value starts at 10 msec for first shard, then goes up to 1 minute for 6000th shard and stays clamped at 1 minute afterwards. (Note: that's the average, the range is 1 -> 2 * average as this is a uniform random distribution). Some sample values: * 100 - 1 second * 1000 - 10 seconds * 6000 and higher - 1 minute Jira: COUCHDB-3389
-rw-r--r--src/couch/src/couch_multidb_changes.erl91
1 files changed, 50 insertions, 41 deletions
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl
index b9cddd18c..5efcccaac 100644
--- a/src/couch/src/couch_multidb_changes.erl
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -37,8 +37,8 @@
-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
--define(AVG_DELAY_MSEC, 100).
--define(MAX_DELAY_MSEC, 60000).
+-define(AVG_DELAY_MSEC, 10).
+-define(MAX_DELAY_MSEC, 120000).
-record(state, {
tid :: ets:tid(),
@@ -257,40 +257,54 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
{ok, Db} = mem3_util:ensure_exists(
config:get("mem3", "shards_db", "_dbs")),
ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
- ChangesFun(fun({change, {Change}, _}, _) ->
- DbName = couch_util:get_value(<<"id">>, Change),
- case DbName of
- <<"_design/", _/binary>> ->
- ok;
- _Else ->
- case couch_replicator_utils:is_deleted(Change) of
- true ->
- ok;
- false ->
- [gen_server:cast(Server, {resume_scan, ShardName})
- || ShardName <- filter_shards(DbName, DbSuffix)],
- ok
- end
- end;
- (_, _) -> ok
- end),
+ ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
couch_db:close(Db).
-filter_shards(DbName, DbSuffix) ->
- case DbSuffix =:= couch_db:dbname_suffix(DbName) of
- false ->
- [];
- true ->
- try
- [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
- catch
- error:database_does_not_exist ->
- []
+scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> Acc; _Else ->
+ NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName),
+ case {NameMatch, couch_replicator_utils:is_deleted(Change)} of
+ {false, _} ->
+ Acc;
+ {true, true} ->
+ Acc;
+ {true, false} ->
+ Shards = local_shards(DbName),
+ lists:foldl(fun notify_fold/2, Acc, Shards)
end
+ end;
+scan_changes_cb(_, _, Acc) ->
+ Acc.
+
+
+local_shards(DbName) ->
+ try
+ [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+ catch
+ error:database_does_not_exist ->
+ []
end.
+notify_fold(DbName, {Server, DbSuffix, Count}) ->
+ Jitter = jitter(Count),
+ spawn_link(fun() ->
+ timer:sleep(Jitter),
+ gen_server:cast(Server, {resume_scan, DbName})
+ end),
+ {Server, DbSuffix, Count + 1}.
+
+
+% Jitter is proportional to the number of shards found so far. This is done to
+% avoid a stampede and notifying the callback function with potentially a large
+% number of shards back to back during startup.
+jitter(N) ->
+ Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
+ random:uniform(Range).
+
+
scan_local_db(Server, DbSuffix) when is_pid(Server) ->
case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
{ok, Db} ->
@@ -716,31 +730,26 @@ scan_dbs_test_() ->
fun() -> test_util:start_couch([mem3, fabric]) end,
fun(Ctx) -> test_util:stop_couch(Ctx) end,
[
- t_pass_shard(),
- t_fail_shard(),
+ t_find_shard(),
+ t_shard_not_found(),
t_pass_local(),
t_fail_local()
]
}.
-t_pass_shard() ->
+t_find_shard() ->
?_test(begin
- DbName0 = ?tempdb(),
- DbSuffix = <<"_replicator">>,
- DbName = <<DbName0/binary, "/", DbSuffix/binary>>,
+ DbName = ?tempdb(),
ok = fabric:create_db(DbName, [?CTX]),
- ?assertEqual(8, length(filter_shards(DbName, DbSuffix))),
+ ?assertEqual(8, length(local_shards(DbName))),
fabric:delete_db(DbName, [?CTX])
end).
-t_fail_shard() ->
+t_shard_not_found() ->
?_test(begin
- DbName = ?tempdb(),
- ok = fabric:create_db(DbName, [?CTX]),
- ?assertEqual([], filter_shards(DbName, <<"_replicator">>)),
- fabric:delete_db(DbName, [?CTX])
+ ?assertEqual([], local_shards(?tempdb()))
end).