summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-06-28 16:31:35 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2017-06-28 19:33:12 -0400
commit571a2fc0b7445df0f6ad4626f6172bfe6c8c9a7c (patch)
tree8964a8a3842e045971d5746a5cfd81429ced30f5
parentd23b26d4e8faf719fdf12f1a5f2e20c5adf9fa1b (diff)
downloadcouchdb-571a2fc0b7445df0f6ad4626f6172bfe6c8c9a7c.tar.gz
Ensure replicator _active_tasks entry reports recent pending changes value
Previously there was a race between reporting the source update sequence between the the workers and the changes readers. Each one used separate incrementing timestamp sequences. In some cases that lead to pending changes being stuck. For example, if changes reader reported the highest sequence with timestamp 10000, then later workers reported it with sequences 5000, 5001, 5002, then all those reports would be ignored and users would see an always lagging pending changes value reported with timestamp 1000. The fix is to thread the last_sequence update through the changes queue to the changes manager, so only its timestamp sequence will be used. This removes the race condition.
-rw-r--r--src/couch_replicator/src/couch_replicator_changes_reader.erl23
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl16
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl4
3 files changed, 28 insertions, 15 deletions
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl
index 8ab92625f..2526618d4 100644
--- a/src/couch_replicator/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl
@@ -16,7 +16,7 @@
-export([start_link/4]).
% Exported for code reloading
--export([read_changes/6]).
+-export([read_changes/5]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator_api_wrap.hrl").
@@ -32,29 +32,29 @@ start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
put(last_seq, StartSeq),
put(retries_left, Db#httpdb.retries),
?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0},
- ChangesQueue, Options, 1)
+ ChangesQueue, Options)
end)};
start_link(StartSeq, Db, ChangesQueue, Options) ->
Parent = self(),
{ok, spawn_link(fun() ->
- ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options, 1)
+ ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options)
end)}.
-read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
+read_changes(Parent, StartSeq, Db, ChangesQueue, Options) ->
Continuous = couch_util:get_value(continuous, Options),
try
couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
fun(Item) ->
- process_change(Item, {Parent, Db, ChangesQueue, Continuous, Ts})
+ process_change(Item, {Parent, Db, ChangesQueue, Continuous})
end, Options),
couch_work_queue:close(ChangesQueue)
catch
throw:recurse ->
LS = get(last_seq),
- read_changes(Parent, LS, Db, ChangesQueue, Options, Ts+1);
+ read_changes(Parent, LS, Db, ChangesQueue, Options);
throw:retry_no_limit ->
LS = get(last_seq),
- read_changes(Parent, LS, Db, ChangesQueue, Options, Ts);
+ read_changes(Parent, LS, Db, ChangesQueue, Options);
throw:{retry_limit, Error} ->
couch_stats:increment_counter(
[couch_replicator, changes_read_failures]
@@ -75,7 +75,7 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
" with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
Db
end,
- read_changes(Parent, LastSeq, Db2, ChangesQueue, Options, Ts);
+ read_changes(Parent, LastSeq, Db2, ChangesQueue, Options);
_ ->
exit(Error)
end
@@ -89,7 +89,7 @@ process_change(#doc_info{id = <<>>} = DocInfo, {_, Db, _, _, _}) ->
"source database `~s` (_changes sequence ~p)",
[couch_replicator_api_wrap:db_uri(Db), DocInfo#doc_info.high_seq]);
-process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _, _}) ->
+process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) ->
case is_doc_id_too_long(byte_size(Id)) of
true ->
SourceDb = couch_replicator_api_wrap:db_uri(Db),
@@ -102,14 +102,13 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _, _}) -
put(last_seq, DocInfo#doc_info.high_seq)
end;
-process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
+process_change({last_seq, LS}, {_Parent, _, ChangesQueue, true = _Continuous}) ->
% LS should never be undefined, but it doesn't hurt to be defensive inside
% the replicator.
Seq = case LS of undefined -> get(last_seq); _ -> LS end,
OldSeq = get(last_seq),
if Seq == OldSeq -> ok; true ->
- Msg = {report_seq_done, {Ts, Seq}, couch_replicator_stats:new()},
- ok = gen_server:call(Parent, Msg, infinity)
+ ok = couch_work_queue:queue(ChangesQueue, {last_seq, Seq})
end,
put(last_seq, Seq),
throw(recurse);
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 3253ce526..66bfdbff7 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -652,9 +652,19 @@ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
closed ->
From ! {closed, self()};
- {ok, Changes} ->
- #doc_info{high_seq = Seq} = lists:last(Changes),
- ReportSeq = {Ts, Seq},
+ {ok, ChangesOrLastSeqs} ->
+ ReportSeq = case lists:last(ChangesOrLastSeqs) of
+ {last_seq, Seq} ->
+ {Ts, Seq};
+ #doc_info{high_seq = Seq} ->
+ {Ts, Seq}
+ end,
+ Changes = lists:filter(
+ fun(#doc_info{}) ->
+ true;
+ ({last_seq, _Seq}) ->
+ false
+ end, ChangesOrLastSeqs),
ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
From ! {changes, self(), Changes, ReportSeq}
end,
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 1907879c6..b52640d5d 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -219,6 +219,10 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
receive
{closed, ChangesManager} ->
ok;
+ {changes, ChangesManager, [], ReportSeq} ->
+ Stats = couch_replicator_stats:new(),
+ ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
{changes, ChangesManager, Changes, ReportSeq} ->
Target2 = open_db(Target),
{IdRevs, Stats0} = find_missing(Changes, Target2),