summaryrefslogtreecommitdiff
path: root/src/couch_jobs/src/couch_jobs_type_monitor.erl
blob: 562a866da66bcf2a55895769002a9df1ed9a234a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_jobs_type_monitor).


-export([
    start/4
]).


-include("couch_jobs.hrl").


-record(st, {
    jtx,
    type,
    vs,
    parent,
    timestamp,
    holdoff,
    timeout
}).


start(Type, VS, HoldOff, Timeout) ->
    Parent = self(),
    spawn_link(fun() ->
        loop(#st{
            jtx = couch_jobs_fdb:get_jtx(),
            type = Type,
            vs = VS,
            parent = Parent,
            timestamp = 0,
            holdoff = HoldOff,
            timeout = Timeout
        })
    end).


loop(#st{vs = VS, timeout = Timeout} = St) ->
    {St1, Watch} = case get_vs_and_watch(St) of
        {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
        {VS, W} -> {St, W}
    end,
    try
        erlfdb:wait(Watch, [{timeout, Timeout}])
    catch
        error:{erlfdb_error, 1009} ->
            erlfdb:cancel(Watch, [flush]),
            ok;
        error:{timeout, _} ->
            erlfdb:cancel(Watch, [flush]),
            ok
    end,
    loop(St1).


notify(#st{} = St) ->
    #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
    Now = erlang:system_time(millisecond),
    case Now - Ts of
        Dt when Dt < HoldOff ->
            timer:sleep(max(HoldOff - Dt, 0));
        _ ->
            ok
    end,
    Pid ! {type_updated, VS},
    St#st{timestamp = Now}.


get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
    couch_jobs_fdb:tx(JTx, fun(JTx1) ->
        couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
    end).