diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2017-06-28 16:31:35 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2017-06-28 19:33:12 -0400 |
commit | 571a2fc0b7445df0f6ad4626f6172bfe6c8c9a7c (patch) | |
tree | 8964a8a3842e045971d5746a5cfd81429ced30f5 | |
parent | d23b26d4e8faf719fdf12f1a5f2e20c5adf9fa1b (diff) | |
download | couchdb-571a2fc0b7445df0f6ad4626f6172bfe6c8c9a7c.tar.gz |
Ensure replicator _active_tasks entry reports recent pending changes value
Previously there was a race between reporting the source update sequence
between the the workers and the changes readers. Each one used separate
incrementing timestamp sequences.
In some cases that lead to pending changes being stuck. For example, if changes
reader reported the highest sequence with timestamp 10000, then later workers
reported it with sequences 5000, 5001, 5002, then all those reports would be
ignored and users would see an always lagging pending changes value reported
with timestamp 1000.
The fix is to thread the last_sequence update through the changes queue to
the changes manager, so only its timestamp sequence will be used. This removes
the race condition.
3 files changed, 28 insertions, 15 deletions
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl index 8ab92625f..2526618d4 100644 --- a/src/couch_replicator/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl @@ -16,7 +16,7 @@ -export([start_link/4]). % Exported for code reloading --export([read_changes/6]). +-export([read_changes/5]). -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator_api_wrap.hrl"). @@ -32,29 +32,29 @@ start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) -> put(last_seq, StartSeq), put(retries_left, Db#httpdb.retries), ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, - ChangesQueue, Options, 1) + ChangesQueue, Options) end)}; start_link(StartSeq, Db, ChangesQueue, Options) -> Parent = self(), {ok, spawn_link(fun() -> - ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1) + ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options) end)}. -read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) -> +read_changes(Parent, StartSeq, Db, ChangesQueue, Options) -> Continuous = couch_util:get_value(continuous, Options), try couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq, fun(Item) -> - process_change(Item, {Parent, Db, ChangesQueue, Continuous, Ts}) + process_change(Item, {Parent, Db, ChangesQueue, Continuous}) end, Options), couch_work_queue:close(ChangesQueue) catch throw:recurse -> LS = get(last_seq), - read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1); + read_changes(Parent, LS, Db, ChangesQueue, Options); throw:retry_no_limit -> LS = get(last_seq), - read_changes(Parent, LS, Db, ChangesQueue, Options, Ts); + read_changes(Parent, LS, Db, ChangesQueue, Options); throw:{retry_limit, Error} -> couch_stats:increment_counter( [couch_replicator, changes_read_failures] @@ -75,7 +75,7 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) -> " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]), Db end, - read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts); + read_changes(Parent, LastSeq, Db2, ChangesQueue, Options); _ -> exit(Error) end @@ -89,7 +89,7 @@ process_change(#doc_info{id = <<>>} = DocInfo, {_, Db, _, _, _}) -> "source database `~s` (_changes sequence ~p)", [couch_replicator_api_wrap:db_uri(Db), DocInfo#doc_info.high_seq]); -process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _, _}) -> +process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) -> case is_doc_id_too_long(byte_size(Id)) of true -> SourceDb = couch_replicator_api_wrap:db_uri(Db), @@ -102,14 +102,13 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _, _}) - put(last_seq, DocInfo#doc_info.high_seq) end; -process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) -> +process_change({last_seq, LS}, {_Parent, _, ChangesQueue, true = _Continuous}) -> % LS should never be undefined, but it doesn't hurt to be defensive inside % the replicator. Seq = case LS of undefined -> get(last_seq); _ -> LS end, OldSeq = get(last_seq), if Seq == OldSeq -> ok; true -> - Msg = {report_seq_done, {Ts, Seq}, couch_replicator_stats:new()}, - ok = gen_server:call(Parent, Msg, infinity) + ok = couch_work_queue:queue(ChangesQueue, {last_seq, Seq}) end, put(last_seq, Seq), throw(recurse); diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 3253ce526..66bfdbff7 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -652,9 +652,19 @@ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> case couch_work_queue:dequeue(ChangesQueue, BatchSize) of closed -> From ! {closed, self()}; - {ok, Changes} -> - #doc_info{high_seq = Seq} = lists:last(Changes), - ReportSeq = {Ts, Seq}, + {ok, ChangesOrLastSeqs} -> + ReportSeq = case lists:last(ChangesOrLastSeqs) of + {last_seq, Seq} -> + {Ts, Seq}; + #doc_info{high_seq = Seq} -> + {Ts, Seq} + end, + Changes = lists:filter( + fun(#doc_info{}) -> + true; + ({last_seq, _Seq}) -> + false + end, ChangesOrLastSeqs), ok = gen_server:cast(Parent, {report_seq, ReportSeq}), From ! {changes, self(), Changes, ReportSeq} end, diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 1907879c6..b52640d5d 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -219,6 +219,10 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> receive {closed, ChangesManager} -> ok; + {changes, ChangesManager, [], ReportSeq} -> + Stats = couch_replicator_stats:new(), + ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager); {changes, ChangesManager, Changes, ReportSeq} -> Target2 = open_db(Target), {IdRevs, Stats0} = find_missing(Changes, Target2), |