summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2018-10-02 00:33:21 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2018-10-02 10:08:36 -0400
commit080da7cbad751a85394f700610c0a45da79651ae (patch)
tree634b9a8adc23bb9e3f1a2097001aa74c514af68c
parent691605b9336acdc2daf76ae27eaabe23a70e8e63 (diff)
downloadcouchdb-080da7cbad751a85394f700610c0a45da79651ae.tar.gz
Avoid restarting /_replicate jobs if parameters haven't changed
This used to be the case before the scheduling replicator: https://github.com/apache/couchdb-couch-replicator/blob/master/src/couch_replicator.erl#L166 This is also how replications backed by a document in a _replicator db behave: https://github.com/apache/couchdb/blob/master/src/couch_replicator/src/couch_replicator_doc_processor.erl#L283
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl51
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl53
-rw-r--r--src/couch_replicator/src/couch_replicator_utils.erl49
3 files changed, 98 insertions, 55 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index d3c001f26..1b43598da 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -264,7 +264,9 @@ code_change(_OldVsn, State, _Extra) ->
% same document.
-spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
updated_doc(Id, Rep, Filter) ->
- case normalize_rep(current_rep(Id)) == normalize_rep(Rep) of
+ NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)),
+ NormNewRep = couch_replicator_utils:normalize_rep(Rep),
+ case NormCurRep == NormNewRep of
false ->
removed_doc(Id),
Row = #rdoc{
@@ -304,25 +306,6 @@ current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
end.
-% Normalize a #rep{} record such that it doesn't contain time dependent fields
-% pids (like httpc pools), and options / props are sorted. This function would
-% used during comparisons.
--spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
-normalize_rep(nil) ->
- nil;
-
-normalize_rep(#rep{} = Rep)->
- #rep{
- source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
- target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
- options = Rep#rep.options, % already sorted in make_options/1
- type = Rep#rep.type,
- view = Rep#rep.view,
- doc_id = Rep#rep.doc_id,
- db_name = Rep#rep.db_name
- }.
-
-
-spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
worker_returned(Ref, Id, {ok, RepId}) ->
case ets:lookup(?MODULE, Id) of
@@ -819,34 +802,6 @@ t_cluster_membership_foldl() ->
end).
-normalize_rep_test_() ->
- {
- setup,
- fun() -> meck:expect(config, get,
- fun(_, _, Default) -> Default end)
- end,
- fun(_) -> meck:unload() end,
- ?_test(begin
- EJson1 = {[
- {<<"source">>, <<"http://host.com/source_db">>},
- {<<"target">>, <<"local">>},
- {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
- {<<"other_field">>, <<"some_value">>}
- ]},
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
- EJson2 = {[
- {<<"other_field">>, <<"unrelated">>},
- {<<"target">>, <<"local">>},
- {<<"source">>, <<"http://host.com/source_db">>},
- {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
- {<<"other_field2">>, <<"unrelated2">>}
- ]},
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
- ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
- end)
- }.
-
-
get_worker_ref_test_() ->
{
setup,
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index 50896c548..762ef18fe 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -102,12 +102,17 @@ start_link() ->
-spec add_job(#rep{}) -> ok.
add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
- Job = #job{
- id = Rep#rep.id,
- rep = Rep,
- history = [{added, os:timestamp()}]
- },
- gen_server:call(?MODULE, {add_job, Job}, infinity).
+ case existing_replication(Rep) of
+ false ->
+ Job = #job{
+ id = Rep#rep.id,
+ rep = Rep,
+ history = [{added, os:timestamp()}]
+ },
+ gen_server:call(?MODULE, {add_job, Job}, infinity);
+ true ->
+ ok
+ end.
-spec remove_job(job_id()) -> ok.
@@ -925,6 +930,17 @@ stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
+-spec existing_replication(#rep{}) -> boolean().
+existing_replication(#rep{} = NewRep) ->
+ case job_by_id(NewRep#rep.id) of
+ {ok, #job{rep = CurRep}} ->
+ NormCurRep = couch_replicator_utils:normalize_rep(CurRep),
+ NormNewRep = couch_replicator_utils:normalize_rep(NewRep),
+ NormCurRep == NormNewRep;
+ {error, not_found} ->
+ false
+ end.
+
-ifdef(TEST).
@@ -1017,6 +1033,7 @@ scheduler_test_() ->
t_start_oldest_first(),
t_dont_stop_if_nothing_pending(),
t_max_churn_limits_number_of_rotated_jobs(),
+ t_existing_jobs(),
t_if_pending_less_than_running_start_all_pending(),
t_running_less_than_pending_swap_all_running(),
t_oneshot_dont_get_rotated(),
@@ -1308,6 +1325,30 @@ t_if_permanent_job_crashes_it_stays_in_ets() ->
end).
+t_existing_jobs() ->
+ ?_test(begin
+ Rep = #rep{
+ id = job1,
+ db_name = <<"db">>,
+ source = <<"s">>,
+ target = <<"t">>,
+ options = [{continuous, true}]
+ },
+ setup_jobs([#job{id = Rep#rep.id, rep = Rep}]),
+ NewRep = #rep{
+ id = Rep#rep.id,
+ db_name = <<"db">>,
+ source = <<"s">>,
+ target = <<"t">>,
+ options = [{continuous, true}]
+ },
+ ?assert(existing_replication(NewRep)),
+ ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})),
+ ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})),
+ ?assertNot(existing_replication(NewRep#rep{options = []}))
+ end).
+
+
t_job_summary_running() ->
?_test(begin
Job = #job{
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 218fcf501..b0d706953 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -28,7 +28,8 @@
pp_rep_id/1,
iso8601/1,
filter_state/3,
- remove_basic_auth_from_headers/1
+ remove_basic_auth_from_headers/1,
+ normalize_rep/1
]).
-export([
@@ -208,6 +209,25 @@ decode_basic_creds(Base64) ->
end.
+% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% pids (like httpc pools), and options / props are sorted. This function would
+% used during comparisons.
+-spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
+normalize_rep(nil) ->
+ nil;
+
+normalize_rep(#rep{} = Rep)->
+ #rep{
+ source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
+ target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
+ options = Rep#rep.options, % already sorted in make_options/1
+ type = Rep#rep.type,
+ view = Rep#rep.view,
+ doc_id = Rep#rep.doc_id,
+ db_name = Rep#rep.db_name
+ }.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -259,4 +279,31 @@ b64creds(User, Pass) ->
base64:encode_to_string(User ++ ":" ++ Pass).
+normalize_rep_test_() ->
+ {
+ setup,
+ fun() -> meck:expect(config, get,
+ fun(_, _, Default) -> Default end)
+ end,
+ fun(_) -> meck:unload() end,
+ ?_test(begin
+ EJson1 = {[
+ {<<"source">>, <<"http://host.com/source_db">>},
+ {<<"target">>, <<"local">>},
+ {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
+ {<<"other_field">>, <<"some_value">>}
+ ]},
+ Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+ EJson2 = {[
+ {<<"other_field">>, <<"unrelated">>},
+ {<<"target">>, <<"local">>},
+ {<<"source">>, <<"http://host.com/source_db">>},
+ {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
+ {<<"other_field2">>, <<"unrelated2">>}
+ ]},
+ Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+ ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
+ end)
+ }.
+
-endif.