summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <nickva@users.noreply.github.com>2017-05-05 15:35:56 -0400
committerGitHub <noreply@github.com>2017-05-05 15:35:56 -0400
commit4a63d22b363e2059ababb775a9f0b0ca685a87bc (patch)
tree000a9ebff963bbc14bc3aeb727e7863de9623dc1
parent4e983fc23c0982e9a9fead510c00f74e3cdbd676 (diff)
parent50a738a46701332046a467d070309ef832482656 (diff)
downloadcouchdb-4a63d22b363e2059ababb775a9f0b0ca685a87bc.tar.gz
Merge pull request #484 from cloudant/couchdb-3389
Apply random jitter during initial _replicator shard discovery
-rw-r--r--src/couch/src/couch_multidb_changes.erl91
1 files changed, 50 insertions, 41 deletions
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl
index b9cddd18c..5efcccaac 100644
--- a/src/couch/src/couch_multidb_changes.erl
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -37,8 +37,8 @@
-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
--define(AVG_DELAY_MSEC, 100).
--define(MAX_DELAY_MSEC, 60000).
+-define(AVG_DELAY_MSEC, 10).
+-define(MAX_DELAY_MSEC, 120000).
-record(state, {
tid :: ets:tid(),
@@ -257,40 +257,54 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
{ok, Db} = mem3_util:ensure_exists(
config:get("mem3", "shards_db", "_dbs")),
ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
- ChangesFun(fun({change, {Change}, _}, _) ->
- DbName = couch_util:get_value(<<"id">>, Change),
- case DbName of
- <<"_design/", _/binary>> ->
- ok;
- _Else ->
- case couch_replicator_utils:is_deleted(Change) of
- true ->
- ok;
- false ->
- [gen_server:cast(Server, {resume_scan, ShardName})
- || ShardName <- filter_shards(DbName, DbSuffix)],
- ok
- end
- end;
- (_, _) -> ok
- end),
+ ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
couch_db:close(Db).
-filter_shards(DbName, DbSuffix) ->
- case DbSuffix =:= couch_db:dbname_suffix(DbName) of
- false ->
- [];
- true ->
- try
- [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
- catch
- error:database_does_not_exist ->
- []
+scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> Acc; _Else ->
+ NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName),
+ case {NameMatch, couch_replicator_utils:is_deleted(Change)} of
+ {false, _} ->
+ Acc;
+ {true, true} ->
+ Acc;
+ {true, false} ->
+ Shards = local_shards(DbName),
+ lists:foldl(fun notify_fold/2, Acc, Shards)
end
+ end;
+scan_changes_cb(_, _, Acc) ->
+ Acc.
+
+
+local_shards(DbName) ->
+ try
+ [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+ catch
+ error:database_does_not_exist ->
+ []
end.
+notify_fold(DbName, {Server, DbSuffix, Count}) ->
+ Jitter = jitter(Count),
+ spawn_link(fun() ->
+ timer:sleep(Jitter),
+ gen_server:cast(Server, {resume_scan, DbName})
+ end),
+ {Server, DbSuffix, Count + 1}.
+
+
+% Jitter is proportional to the number of shards found so far. This is done to
+% avoid a stampede and notifying the callback function with potentially a large
+% number of shards back to back during startup.
+jitter(N) ->
+ Range = min(2 * N * ?AVG_DELAY_MSEC, ?MAX_DELAY_MSEC),
+ random:uniform(Range).
+
+
scan_local_db(Server, DbSuffix) when is_pid(Server) ->
case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
{ok, Db} ->
@@ -716,31 +730,26 @@ scan_dbs_test_() ->
fun() -> test_util:start_couch([mem3, fabric]) end,
fun(Ctx) -> test_util:stop_couch(Ctx) end,
[
- t_pass_shard(),
- t_fail_shard(),
+ t_find_shard(),
+ t_shard_not_found(),
t_pass_local(),
t_fail_local()
]
}.
-t_pass_shard() ->
+t_find_shard() ->
?_test(begin
- DbName0 = ?tempdb(),
- DbSuffix = <<"_replicator">>,
- DbName = <<DbName0/binary, "/", DbSuffix/binary>>,
+ DbName = ?tempdb(),
ok = fabric:create_db(DbName, [?CTX]),
- ?assertEqual(8, length(filter_shards(DbName, DbSuffix))),
+ ?assertEqual(8, length(local_shards(DbName))),
fabric:delete_db(DbName, [?CTX])
end).
-t_fail_shard() ->
+t_shard_not_found() ->
?_test(begin
- DbName = ?tempdb(),
- ok = fabric:create_db(DbName, [?CTX]),
- ?assertEqual([], filter_shards(DbName, <<"_replicator">>)),
- fabric:delete_db(DbName, [?CTX])
+ ?assertEqual([], local_shards(?tempdb()))
end).