diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2017-05-05 03:21:43 -0400 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2017-05-05 10:24:05 -0400 |
commit | 50a738a46701332046a467d070309ef832482656 (patch) | |
tree | 000a9ebff963bbc14bc3aeb727e7863de9623dc1 | |
parent | 4e983fc23c0982e9a9fead510c00f74e3cdbd676 (diff) | |
download | couchdb-50a738a46701332046a467d070309ef832482656.tar.gz |
Add jittered sleep during replicator shard scanning
This is bringing back previous code:
https://github.com/apache/couchdb/blob/884cf3e55f77ab1a5f26dc7202ce21771062eae6/src/couch_replicator_manager.erl#L940-L946
This is to avoid a stampede during startup when potentially a large number
shards are found and change feeds have to be opened for all of them at the
same time.
The average jitter value starts at 10 msec for first shard, then goes up to
1 minute for 6000th shard and stays clamped at 1 minute afterwards. (Note:
that's the average, the range is 1 -> 2 * average as this is a uniform
random distribution).
Some sample values:
* 100 - 1 second
* 1000 - 10 seconds
* 6000 and higher - 1 minute
Jira: COUCHDB-3389
-rw-r--r-- | src/couch/src/couch_multidb_changes.erl | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index b9cddd18c..5efcccaac 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -37,8 +37,8 @@ -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). --define(AVG_DELAY_MSEC, 100). --define(MAX_DELAY_MSEC, 60000). +-define(AVG_DELAY_MSEC, 10). +-define(MAX_DELAY_MSEC, 120000). -record(state, { tid :: ets:tid(), @@ -257,40 +257,54 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) -> {ok, Db} = mem3_util:ensure_exists( config:get("mem3", "shards_db", "_dbs")), ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil), - ChangesFun(fun({change, {Change}, _}, _) -> - DbName = couch_util:get_value(<<"id">>, Change), - case DbName of - <<"_design/", _/binary>> -> - ok; - _Else -> - case couch_replicator_utils:is_deleted(Change) of - true -> - ok; - false -> - [gen_server:cast(Server, {resume_scan, ShardName}) - || ShardName <- filter_shards(DbName, DbSuffix)], - ok - end - end; - (_, _) -> ok - end), + ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}), couch_db:close(Db). -filter_shards(DbName, DbSuffix) -> - case DbSuffix =:= couch_db:dbname_suffix(DbName) of - false -> - []; - true -> - try - [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)] - catch - error:database_does_not_exist -> - [] +scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) -> + DbName = couch_util:get_value(<<"id">>, Change), + case DbName of <<"_design/", _/binary>> -> Acc; _Else -> + NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName), + case {NameMatch, couch_replicator_utils:is_deleted(Change)} of + {false, _} -> + Acc; + {true, true} -> + Acc; + {true, false} -> + Shards = local_shards(DbName), + lists:foldl(fun notify_fold/2, Acc, Shards) end + end; +scan_changes_cb(_, _, Acc) -> + Acc. + + +local_shards(DbName) -> + try + [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)] + catch + error:database_does_not_exist -> + [] end. +notify_fold(DbName, {Server, DbSuffix, Count}) -> + Jitter = jitter(Count), + spawn_link(fun() -> + timer:sleep(Jitter), + gen_server:cast(Server, {resume_scan, DbName}) + end), + {Server, DbSuffix, Count + 1}. + + +% Jitter is proportional to the number of shards found so far. This is done to +% avoid a stampede and notifying the callback function with potentially a large +% number of shards back to back during startup. +jitter(N) -> + Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC), + random:uniform(Range). + + scan_local_db(Server, DbSuffix) when is_pid(Server) -> case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of {ok, Db} -> @@ -716,31 +730,26 @@ scan_dbs_test_() -> fun() -> test_util:start_couch([mem3, fabric]) end, fun(Ctx) -> test_util:stop_couch(Ctx) end, [ - t_pass_shard(), - t_fail_shard(), + t_find_shard(), + t_shard_not_found(), t_pass_local(), t_fail_local() ] }. -t_pass_shard() -> +t_find_shard() -> ?_test(begin - DbName0 = ?tempdb(), - DbSuffix = <<"_replicator">>, - DbName = <<DbName0/binary, "/", DbSuffix/binary>>, + DbName = ?tempdb(), ok = fabric:create_db(DbName, [?CTX]), - ?assertEqual(8, length(filter_shards(DbName, DbSuffix))), + ?assertEqual(8, length(local_shards(DbName))), fabric:delete_db(DbName, [?CTX]) end). -t_fail_shard() -> +t_shard_not_found() -> ?_test(begin - DbName = ?tempdb(), - ok = fabric:create_db(DbName, [?CTX]), - ?assertEqual([], filter_shards(DbName, <<"_replicator">>)), - fabric:delete_db(DbName, [?CTX]) + ?assertEqual([], local_shards(?tempdb())) end). |