diff options
Diffstat (limited to 'src/couch_jobs/src/couch_jobs_type_monitor.erl')
-rw-r--r-- | src/couch_jobs/src/couch_jobs_type_monitor.erl | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl new file mode 100644 index 000000000..562a866da --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl @@ -0,0 +1,84 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_type_monitor). + + +-export([ + start/4 +]). + + +-include("couch_jobs.hrl"). + + +-record(st, { + jtx, + type, + vs, + parent, + timestamp, + holdoff, + timeout +}). + + +start(Type, VS, HoldOff, Timeout) -> + Parent = self(), + spawn_link(fun() -> + loop(#st{ + jtx = couch_jobs_fdb:get_jtx(), + type = Type, + vs = VS, + parent = Parent, + timestamp = 0, + holdoff = HoldOff, + timeout = Timeout + }) + end). + + +loop(#st{vs = VS, timeout = Timeout} = St) -> + {St1, Watch} = case get_vs_and_watch(St) of + {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W}; + {VS, W} -> {St, W} + end, + try + erlfdb:wait(Watch, [{timeout, Timeout}]) + catch + error:{erlfdb_error, 1009} -> + erlfdb:cancel(Watch, [flush]), + ok; + error:{timeout, _} -> + erlfdb:cancel(Watch, [flush]), + ok + end, + loop(St1). + + +notify(#st{} = St) -> + #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St, + Now = erlang:system_time(millisecond), + case Now - Ts of + Dt when Dt < HoldOff -> + timer:sleep(max(HoldOff - Dt, 0)); + _ -> + ok + end, + Pid ! {type_updated, VS}, + St#st{timestamp = Now}. + + +get_vs_and_watch(#st{jtx = JTx, type = Type}) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type) + end). |