summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_changes_reader.erl
blob: 97c72897166120b475c38967c95d68ecd0735fb7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_replicator_changes_reader).

% Public API
-export([start_link/4]).

% Exported for code reloading
-export([read_changes/5]).

-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-include_lib("kernel/include/logger.hrl").


start_link(StartSeq, #httpdb{} = Db, ChangesQueue, #{} = Options) ->
    Parent = self(),
    {ok, spawn_link(fun() ->
        put(last_seq, StartSeq),
        put(retries_left, Db#httpdb.retries),
        ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0},
            ChangesQueue, Options)
    end)};
start_link(StartSeq, Db, ChangesQueue, Options) ->
    Parent = self(),
    {ok, spawn_link(fun() ->
        ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options)
    end)}.

read_changes(Parent, StartSeq, Db, ChangesQueue, Options) ->
    Continuous = maps:get(<<"continuous">>, Options, false),
    try
        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
            fun(Item) ->
                process_change(Item, {Parent, Db, ChangesQueue, Continuous})
            end, couch_replicator_utils:proplist_options(Options)),
        couch_work_queue:close(ChangesQueue)
    catch
        throw:recurse ->
            LS = get(last_seq),
            read_changes(Parent, LS, Db, ChangesQueue, Options);
        throw:retry_no_limit ->
            LS = get(last_seq),
            read_changes(Parent, LS, Db, ChangesQueue, Options);
        throw:{retry_limit, Error} ->
        couch_stats:increment_counter(
            [couch_replicator, changes_read_failures]
        ),
        case get(retries_left) of
        N when N > 0 ->
            put(retries_left, N - 1),
            LastSeq = get(last_seq),
            LogMsg = #{
                what => retry_changes_feed,
                in => replicator,
                source => couch_replicator_api_wrap:db_uri(Db),
                sequence => LastSeq,
                retries_remaining => N
            },
            Db2 = case LastSeq of
            StartSeq ->
                ?LOG_NOTICE(LogMsg#{delay_sec => Db#httpdb.wait / 1000}),
                couch_log:notice("Retrying _changes request to source database ~s"
                    " with since=~p in ~p seconds",
                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
                ok = timer:sleep(Db#httpdb.wait),
                Db#httpdb{wait = 2 * Db#httpdb.wait};
            _ ->
                ?LOG_NOTICE(LogMsg),
                couch_log:notice("Retrying _changes request to source database ~s"
                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
                Db
            end,
            read_changes(Parent, LastSeq, Db2, ChangesQueue, Options);
        _ ->
            exit(Error)
        end
    end.


process_change(#doc_info{id = <<>>} = DocInfo, {_, Db, _, _}) ->
    % Previous CouchDB releases had a bug which allowed a doc with an empty ID
    % to be inserted into databases. Such doc is impossible to GET.
    ?LOG_ERROR(#{
        what => ignore_empty_docid,
        in => replicator,
        source => couch_replicator_api_wrap:db_uri(Db),
        sequence => DocInfo#doc_info.high_seq
    }),
    couch_log:error("Replicator: ignoring document with empty ID in "
        "source database `~s` (_changes sequence ~p)",
        [couch_replicator_api_wrap:db_uri(Db), DocInfo#doc_info.high_seq]);

process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) ->
    case is_doc_id_too_long(byte_size(Id)) of
        true ->
            SourceDb = couch_replicator_api_wrap:db_uri(Db),
            ?LOG_ERROR(#{
                what => doc_write_failure,
                in => replicator,
                source => SourceDb,
                docid => Id,
                details => "document ID too long"
            }),
            couch_log:error("Replicator: document id `~s...` from source db "
                " `~64s` is too long, ignoring.", [Id, SourceDb]),
            Stats = couch_replicator_stats:new([{doc_write_failures, 1}]),
            ok = gen_server:call(Parent, {add_stats, Stats}, infinity);
        false ->
            ok = couch_work_queue:queue(ChangesQueue, DocInfo),
            put(last_seq, DocInfo#doc_info.high_seq)
    end;

process_change({last_seq, LS}, {_Parent, _, ChangesQueue, true = _Continuous}) ->
    % LS should never be undefined, but it doesn't hurt to be defensive inside
    % the replicator.
    Seq = case LS of undefined -> get(last_seq); _ -> LS end,
    OldSeq = get(last_seq),
    if Seq == OldSeq -> ok; true ->
        ok = couch_work_queue:queue(ChangesQueue, {last_seq, Seq})
    end,
    put(last_seq, Seq),
    throw(recurse);

process_change({last_seq, _}, _) ->
    % This clause is unreachable today, but let's plan ahead for the future
    % where we checkpoint against last_seq instead of the sequence of the last
    % change.  The two can differ substantially in the case of a restrictive
    % filter.
    ok.

is_doc_id_too_long(IdLength) ->
    case config:get("replicator", "max_document_id_length", "infinity") of
        "infinity" ->
            false;
        ConfigMaxStr ->
            ConfigMax = list_to_integer(ConfigMaxStr),
            ConfigMax > 0 andalso IdLength > ConfigMax
    end.