summaryrefslogtreecommitdiff
path: root/src/couch_jobs/src/couch_jobs_notifier.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_jobs/src/couch_jobs_notifier.erl')
-rw-r--r--src/couch_jobs/src/couch_jobs_notifier.erl163
1 files changed, 110 insertions, 53 deletions
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
index 99581cb79..b47834f2f 100644
--- a/src/couch_jobs/src/couch_jobs_notifier.erl
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -33,11 +33,14 @@
-include("couch_jobs.hrl").
+-include_lib("kernel/include/logger.hrl").
--define(TYPE_MONITOR_HOLDOFF_DEFAULT, 50).
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, "50").
-define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
--define(GET_JOBS_RANGE_RATIO, 0.5).
+-define(INIT_BATCH_SIZE, "1000").
+-define(BATCH_FACTOR, "0.75").
+-define(BATCH_INCREMENT, "100").
-record(st, {
@@ -46,7 +49,8 @@
monitor_pid,
subs, % #{JobId => #{Ref => {Pid, State, Seq}}}
pidmap, % #{{Jobid, Pid} => Ref}
- refmap % #{Ref => JobId}
+ refmap, % #{Ref => JobId}
+ batch_size
}).
@@ -76,7 +80,8 @@ init([Type]) ->
type = Type,
subs = #{},
pidmap = #{},
- refmap = #{}
+ refmap = #{},
+ batch_size = init_batch_size()
},
VS = get_type_vs(St),
HoldOff = get_holdoff(),
@@ -121,6 +126,10 @@ handle_info({type_updated, VS}, St) ->
handle_info({Ref, ready}, St) when is_reference(Ref) ->
% Don't crash out couch_jobs_server and the whole application would need to
% eventually do proper cleanup in erlfdb:wait timeout code.
+ ?LOG_ERROR(#{
+ what => spurious_future_ready,
+ ref => Ref
+ }),
LogMsg = "~p : spurious erlfdb future ready message ~p",
couch_log:error(LogMsg, [?MODULE, Ref]),
{noreply, St};
@@ -199,31 +208,44 @@ flush_type_updated_messages(VSMax) ->
end.
-get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, Ratio)
- when Ratio >= ?GET_JOBS_RANGE_RATIO ->
- Filter = fun(JobId) -> maps:is_key(JobId, InactiveIdMap) end,
- JobMap = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
- couch_jobs_fdb:get_jobs(JTx1, Type, Filter)
- end),
- maps:map(fun(JobId, _) ->
- case maps:is_key(JobId, JobMap) of
- true -> maps:get(JobId, JobMap);
- false -> {null, not_found, not_found}
- end
- end, InactiveIdMap);
-
-get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, _) ->
- couch_jobs_fdb:tx(JTx, fun(JTx1) ->
- maps:map(fun(JobId, _) ->
- Job = #{job => true, type => Type, id => JobId},
- case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
- {ok, Seq, State, Data} ->
- {Seq, State, Data};
- {error, not_found} ->
- {null, not_found, not_found}
- end
- end, InactiveIdMap)
- end).
+get_jobs(#st{} = St, Ids) when is_list(Ids) ->
+ #st{jtx = JTx, type = Type, batch_size = BatchSize} = St,
+ {Jobs, NewBatchSize} = get_jobs_iter(JTx, Type, Ids, BatchSize, #{}),
+ {Jobs, St#st{batch_size = NewBatchSize}}.
+
+
+get_jobs_iter(_Jtx, _Type, [], BatchSize, #{} = Acc) ->
+ {Acc, BatchSize};
+
+get_jobs_iter(JTx, Type, Ids, BatchSize, #{} = Acc0) ->
+ {BatchIds, RestIds} = case length(Ids) < BatchSize of
+ true -> {Ids, []};
+ false -> lists:split(BatchSize, Ids)
+ end,
+ Result = try
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ lists:foldl(fun(JobId, #{} = Acc) ->
+ Job = #{job => true, type => Type, id => JobId},
+ case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
+ {ok, Seq, State, Data} ->
+ Acc#{JobId => {Seq, State, Data}};
+ {error, not_found} ->
+ Acc#{JobId => {null, not_found, not_found}}
+ end
+ end, Acc0, BatchIds)
+ end)
+ catch
+ error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+ failed
+ end,
+ case Result of
+ #{} = AccF ->
+ NewBatchSize = BatchSize + batch_increment(),
+ get_jobs_iter(JTx, Type, RestIds, NewBatchSize, AccF);
+ failed ->
+ NewBatchSize = max(1, round(BatchSize * batch_factor())),
+ get_jobs_iter(JTx, Type, Ids, NewBatchSize, Acc0)
+ end.
get_type_vs(#st{jtx = JTx, type = Type}) ->
@@ -236,24 +258,48 @@ get_type_vs(#st{jtx = JTx, type = Type}) ->
% and updated at least once since the given versionstamp. These are relatively
% cheap to find as it's just a range read in the ?ACTIVITY subspace.
%
-get_active_since(#st{} = _St, not_found) ->
- #{};
-
-get_active_since(#st{jtx = JTx, type = Type, subs = Subs}, VS) ->
- AllUpdated = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
- couch_jobs_fdb:get_active_since(JTx1, Type, VS)
- end),
- maps:map(fun(_JobId, Data) ->
+get_active_since(#st{} = St, not_found) ->
+ {#{}, St};
+
+get_active_since(#st{} = St, VS) ->
+ #st{jtx = JTx, type = Type, subs = Subs, batch_size = BatchSize} = St,
+ {Updated, NewBatchSize} = get_active_iter(JTx, Type, VS, BatchSize, #{}),
+ UpdatedSubs = maps:map(fun(_JobId, Data) ->
{VS, running, Data}
- end, maps:with(maps:keys(Subs), AllUpdated)).
+ end, maps:with(maps:keys(Subs), Updated)),
+ {UpdatedSubs, St#st{batch_size = NewBatchSize}}.
+
+
+get_active_iter(JTx, Type, VS, BatchSize, #{} = Acc) ->
+ Opts = [{limit, BatchSize}],
+ Result = try
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_active_since(JTx1, Type, VS, Opts)
+ end)
+ catch
+ error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+ failed
+ end,
+ case Result of
+ {Updated, _FinalSeq} when map_size(Updated) < BatchSize ->
+ {maps:merge(Acc, Updated), BatchSize};
+ {Updated, FinalSeq} when map_size(Updated) >= BatchSize ->
+ Acc1 = maps:merge(Acc, Updated),
+ NewBatchSize = BatchSize + batch_increment(),
+ NextSeq = fabric2_fdb:next_vs(FinalSeq),
+ get_active_iter(JTx, Type, NextSeq, NewBatchSize, Acc1);
+ failed ->
+ NewBatchSize = max(1, round(BatchSize * batch_factor())),
+ get_active_iter(JTx, Type, VS, NewBatchSize, Acc)
+ end.
try_notify_subscribers(ActiveVS, #st{} = St) ->
try
notify_subscribers(ActiveVS, St)
catch
- error:{timeout, _} -> try_notify_subscribers(ActiveVS, St);
- error:{erlfdb_error, 1031} -> try_notify_subscribers(ActiveVS, St)
+ error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) ->
+ try_notify_subscribers(ActiveVS, St)
end.
@@ -263,14 +309,13 @@ notify_subscribers(_, #st{subs = Subs} = St) when map_size(Subs) =:= 0 ->
notify_subscribers(ActiveVS, #st{} = St1) ->
% First gather the easy (cheap) active jobs. Then with those out of way
% inspect each job to get its state.
- Active = get_active_since(St1, ActiveVS),
- St2 = notify_job_ids(Active, St1),
+ {Active, St2} = get_active_since(St1, ActiveVS),
+ St3 = notify_job_ids(Active, St2),
ActiveIds = maps:keys(Active),
- Subs = St2#st.subs,
- InactiveIdMap = maps:without(ActiveIds, Subs),
- InactiveRatio = maps:size(InactiveIdMap) / maps:size(Subs),
- Inactive = get_jobs(St2, InactiveIdMap, InactiveRatio),
- notify_job_ids(Inactive, St2).
+ Subs = St3#st.subs,
+ InactiveIds = maps:keys(maps:without(ActiveIds, Subs)),
+ {Inactive, St4} = get_jobs(St3, InactiveIds),
+ notify_job_ids(Inactive, St4).
notify_job_ids(#{} = Jobs, #st{type = Type} = St0) ->
@@ -302,13 +347,25 @@ notify(Pid, Ref, Type, Id, State, Data) ->
get_holdoff() ->
- config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+ couch_jobs_util:get_non_neg_int(type_monitor_holdoff_msec,
?TYPE_MONITOR_HOLDOFF_DEFAULT).
get_timeout() ->
- Default = ?TYPE_MONITOR_TIMEOUT_DEFAULT,
- case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
- "infinity" -> infinity;
- Milliseconds -> list_to_integer(Milliseconds)
- end.
+ couch_jobs_util:get_timeout(type_monitor_timeout_msec,
+ ?TYPE_MONITOR_TIMEOUT_DEFAULT).
+
+
+init_batch_size() ->
+ couch_jobs_util:get_non_neg_int(notifier_init_batch_size,
+ ?INIT_BATCH_SIZE).
+
+
+batch_increment() ->
+ couch_jobs_util:get_non_neg_int(notifier_batch_increment,
+ ?BATCH_INCREMENT).
+
+
+batch_factor() ->
+ couch_jobs_util:get_float_0_1(notifier_batch_factor,
+ ?BATCH_FACTOR).