diff options
author | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2017-05-05 15:35:56 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-05 15:35:56 -0400 |
commit | 4a63d22b363e2059ababb775a9f0b0ca685a87bc (patch) | |
tree | 000a9ebff963bbc14bc3aeb727e7863de9623dc1 | |
parent | 4e983fc23c0982e9a9fead510c00f74e3cdbd676 (diff) | |
parent | 50a738a46701332046a467d070309ef832482656 (diff) | |
download | couchdb-4a63d22b363e2059ababb775a9f0b0ca685a87bc.tar.gz |
Merge pull request #484 from cloudant/couchdb-3389
Apply random jitter during initial _replicator shard discovery
-rw-r--r-- | src/couch/src/couch_multidb_changes.erl | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index b9cddd18c..5efcccaac 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -37,8 +37,8 @@ -define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). --define(AVG_DELAY_MSEC, 100). --define(MAX_DELAY_MSEC, 60000). +-define(AVG_DELAY_MSEC, 10). +-define(MAX_DELAY_MSEC, 120000). -record(state, { tid :: ets:tid(), @@ -257,40 +257,54 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) -> {ok, Db} = mem3_util:ensure_exists( config:get("mem3", "shards_db", "_dbs")), ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil), - ChangesFun(fun({change, {Change}, _}, _) -> - DbName = couch_util:get_value(<<"id">>, Change), - case DbName of - <<"_design/", _/binary>> -> - ok; - _Else -> - case couch_replicator_utils:is_deleted(Change) of - true -> - ok; - false -> - [gen_server:cast(Server, {resume_scan, ShardName}) - || ShardName <- filter_shards(DbName, DbSuffix)], - ok - end - end; - (_, _) -> ok - end), + ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}), couch_db:close(Db). -filter_shards(DbName, DbSuffix) -> - case DbSuffix =:= couch_db:dbname_suffix(DbName) of - false -> - []; - true -> - try - [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)] - catch - error:database_does_not_exist -> - [] +scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) -> + DbName = couch_util:get_value(<<"id">>, Change), + case DbName of <<"_design/", _/binary>> -> Acc; _Else -> + NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName), + case {NameMatch, couch_replicator_utils:is_deleted(Change)} of + {false, _} -> + Acc; + {true, true} -> + Acc; + {true, false} -> + Shards = local_shards(DbName), + lists:foldl(fun notify_fold/2, Acc, Shards) end + end; +scan_changes_cb(_, _, Acc) -> + Acc. + + +local_shards(DbName) -> + try + [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)] + catch + error:database_does_not_exist -> + [] end. +notify_fold(DbName, {Server, DbSuffix, Count}) -> + Jitter = jitter(Count), + spawn_link(fun() -> + timer:sleep(Jitter), + gen_server:cast(Server, {resume_scan, DbName}) + end), + {Server, DbSuffix, Count + 1}. + + +% Jitter is proportional to the number of shards found so far. This is done to +% avoid a stampede and notifying the callback function with potentially a large +% number of shards back to back during startup. +jitter(N) -> + Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC), + random:uniform(Range). + + scan_local_db(Server, DbSuffix) when is_pid(Server) -> case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of {ok, Db} -> @@ -716,31 +730,26 @@ scan_dbs_test_() -> fun() -> test_util:start_couch([mem3, fabric]) end, fun(Ctx) -> test_util:stop_couch(Ctx) end, [ - t_pass_shard(), - t_fail_shard(), + t_find_shard(), + t_shard_not_found(), t_pass_local(), t_fail_local() ] }. -t_pass_shard() -> +t_find_shard() -> ?_test(begin - DbName0 = ?tempdb(), - DbSuffix = <<"_replicator">>, - DbName = <<DbName0/binary, "/", DbSuffix/binary>>, + DbName = ?tempdb(), ok = fabric:create_db(DbName, [?CTX]), - ?assertEqual(8, length(filter_shards(DbName, DbSuffix))), + ?assertEqual(8, length(local_shards(DbName))), fabric:delete_db(DbName, [?CTX]) end). -t_fail_shard() -> +t_shard_not_found() -> ?_test(begin - DbName = ?tempdb(), - ok = fabric:create_db(DbName, [?CTX]), - ?assertEqual([], filter_shards(DbName, <<"_replicator">>)), - fabric:delete_db(DbName, [?CTX]) + ?assertEqual([], local_shards(?tempdb())) end). |