diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2018-10-02 00:33:21 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2018-10-02 10:08:36 -0400 |
commit | 080da7cbad751a85394f700610c0a45da79651ae (patch) | |
tree | 634b9a8adc23bb9e3f1a2097001aa74c514af68c | |
parent | 691605b9336acdc2daf76ae27eaabe23a70e8e63 (diff) | |
download | couchdb-080da7cbad751a85394f700610c0a45da79651ae.tar.gz |
Avoid restarting /_replicate jobs if parameters haven't changed
This used to be the case before the scheduling replicator:
https://github.com/apache/couchdb-couch-replicator/blob/master/src/couch_replicator.erl#L166
This is also how replications backed by a document in a _replicator db behave:
https://github.com/apache/couchdb/blob/master/src/couch_replicator/src/couch_replicator_doc_processor.erl#L283
3 files changed, 98 insertions, 55 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index d3c001f26..1b43598da 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -264,7 +264,9 @@ code_change(_OldVsn, State, _Extra) -> % same document. -spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok. updated_doc(Id, Rep, Filter) -> - case normalize_rep(current_rep(Id)) == normalize_rep(Rep) of + NormCurRep = couch_replicator_utils:normalize_rep(current_rep(Id)), + NormNewRep = couch_replicator_utils:normalize_rep(Rep), + case NormCurRep == NormNewRep of false -> removed_doc(Id), Row = #rdoc{ @@ -304,25 +306,6 @@ current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> end. -% Normalize a #rep{} record such that it doesn't contain time dependent fields -% pids (like httpc pools), and options / props are sorted. This function would -% used during comparisons. --spec normalize_rep(#rep{} | nil) -> #rep{} | nil. -normalize_rep(nil) -> - nil; - -normalize_rep(#rep{} = Rep)-> - #rep{ - source = couch_replicator_api_wrap:normalize_db(Rep#rep.source), - target = couch_replicator_api_wrap:normalize_db(Rep#rep.target), - options = Rep#rep.options, % already sorted in make_options/1 - type = Rep#rep.type, - view = Rep#rep.view, - doc_id = Rep#rep.doc_id, - db_name = Rep#rep.db_name - }. - - -spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok. worker_returned(Ref, Id, {ok, RepId}) -> case ets:lookup(?MODULE, Id) of @@ -819,34 +802,6 @@ t_cluster_membership_foldl() -> end). -normalize_rep_test_() -> - { - setup, - fun() -> meck:expect(config, get, - fun(_, _, Default) -> Default end) - end, - fun(_) -> meck:unload() end, - ?_test(begin - EJson1 = {[ - {<<"source">>, <<"http://host.com/source_db">>}, - {<<"target">>, <<"local">>}, - {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]}, - {<<"other_field">>, <<"some_value">>} - ]}, - Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1), - EJson2 = {[ - {<<"other_field">>, <<"unrelated">>}, - {<<"target">>, <<"local">>}, - {<<"source">>, <<"http://host.com/source_db">>}, - {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]}, - {<<"other_field2">>, <<"unrelated2">>} - ]}, - Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2), - ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2)) - end) - }. - - get_worker_ref_test_() -> { setup, diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index 50896c548..762ef18fe 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -102,12 +102,17 @@ start_link() -> -spec add_job(#rep{}) -> ok. add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> - Job = #job{ - id = Rep#rep.id, - rep = Rep, - history = [{added, os:timestamp()}] - }, - gen_server:call(?MODULE, {add_job, Job}, infinity). + case existing_replication(Rep) of + false -> + Job = #job{ + id = Rep#rep.id, + rep = Rep, + history = [{added, os:timestamp()}] + }, + gen_server:call(?MODULE, {add_job, Job}, infinity); + true -> + ok + end. -spec remove_job(job_id()) -> ok. @@ -925,6 +930,17 @@ stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) -> Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}. +-spec existing_replication(#rep{}) -> boolean(). +existing_replication(#rep{} = NewRep) -> + case job_by_id(NewRep#rep.id) of + {ok, #job{rep = CurRep}} -> + NormCurRep = couch_replicator_utils:normalize_rep(CurRep), + NormNewRep = couch_replicator_utils:normalize_rep(NewRep), + NormCurRep == NormNewRep; + {error, not_found} -> + false + end. + -ifdef(TEST). @@ -1017,6 +1033,7 @@ scheduler_test_() -> t_start_oldest_first(), t_dont_stop_if_nothing_pending(), t_max_churn_limits_number_of_rotated_jobs(), + t_existing_jobs(), t_if_pending_less_than_running_start_all_pending(), t_running_less_than_pending_swap_all_running(), t_oneshot_dont_get_rotated(), @@ -1308,6 +1325,30 @@ t_if_permanent_job_crashes_it_stays_in_ets() -> end). +t_existing_jobs() -> + ?_test(begin + Rep = #rep{ + id = job1, + db_name = <<"db">>, + source = <<"s">>, + target = <<"t">>, + options = [{continuous, true}] + }, + setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), + NewRep = #rep{ + id = Rep#rep.id, + db_name = <<"db">>, + source = <<"s">>, + target = <<"t">>, + options = [{continuous, true}] + }, + ?assert(existing_replication(NewRep)), + ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), + ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), + ?assertNot(existing_replication(NewRep#rep{options = []})) + end). + + t_job_summary_running() -> ?_test(begin Job = #job{ diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index 218fcf501..b0d706953 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -28,7 +28,8 @@ pp_rep_id/1, iso8601/1, filter_state/3, - remove_basic_auth_from_headers/1 + remove_basic_auth_from_headers/1, + normalize_rep/1 ]). -export([ @@ -208,6 +209,25 @@ decode_basic_creds(Base64) -> end. +% Normalize a #rep{} record such that it doesn't contain time dependent fields +% pids (like httpc pools), and options / props are sorted. This function would +% used during comparisons. +-spec normalize_rep(#rep{} | nil) -> #rep{} | nil. +normalize_rep(nil) -> + nil; + +normalize_rep(#rep{} = Rep)-> + #rep{ + source = couch_replicator_api_wrap:normalize_db(Rep#rep.source), + target = couch_replicator_api_wrap:normalize_db(Rep#rep.target), + options = Rep#rep.options, % already sorted in make_options/1 + type = Rep#rep.type, + view = Rep#rep.view, + doc_id = Rep#rep.doc_id, + db_name = Rep#rep.db_name + }. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -259,4 +279,31 @@ b64creds(User, Pass) -> base64:encode_to_string(User ++ ":" ++ Pass). +normalize_rep_test_() -> + { + setup, + fun() -> meck:expect(config, get, + fun(_, _, Default) -> Default end) + end, + fun(_) -> meck:unload() end, + ?_test(begin + EJson1 = {[ + {<<"source">>, <<"http://host.com/source_db">>}, + {<<"target">>, <<"local">>}, + {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]}, + {<<"other_field">>, <<"some_value">>} + ]}, + Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1), + EJson2 = {[ + {<<"other_field">>, <<"unrelated">>}, + {<<"target">>, <<"local">>}, + {<<"source">>, <<"http://host.com/source_db">>}, + {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]}, + {<<"other_field2">>, <<"unrelated2">>} + ]}, + Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2), + ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2)) + end) + }. + -endif. |