diff options
Diffstat (limited to 'src/couch_jobs/src/couch_jobs_notifier.erl')
-rw-r--r-- | src/couch_jobs/src/couch_jobs_notifier.erl | 163 |
1 files changed, 110 insertions, 53 deletions
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl index 99581cb79..b47834f2f 100644 --- a/src/couch_jobs/src/couch_jobs_notifier.erl +++ b/src/couch_jobs/src/couch_jobs_notifier.erl @@ -33,11 +33,14 @@ -include("couch_jobs.hrl"). +-include_lib("kernel/include/logger.hrl"). --define(TYPE_MONITOR_HOLDOFF_DEFAULT, 50). +-define(TYPE_MONITOR_HOLDOFF_DEFAULT, "50"). -define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity"). --define(GET_JOBS_RANGE_RATIO, 0.5). +-define(INIT_BATCH_SIZE, "1000"). +-define(BATCH_FACTOR, "0.75"). +-define(BATCH_INCREMENT, "100"). -record(st, { @@ -46,7 +49,8 @@ monitor_pid, subs, % #{JobId => #{Ref => {Pid, State, Seq}}} pidmap, % #{{Jobid, Pid} => Ref} - refmap % #{Ref => JobId} + refmap, % #{Ref => JobId} + batch_size }). @@ -76,7 +80,8 @@ init([Type]) -> type = Type, subs = #{}, pidmap = #{}, - refmap = #{} + refmap = #{}, + batch_size = init_batch_size() }, VS = get_type_vs(St), HoldOff = get_holdoff(), @@ -121,6 +126,10 @@ handle_info({type_updated, VS}, St) -> handle_info({Ref, ready}, St) when is_reference(Ref) -> % Don't crash out couch_jobs_server and the whole application would need to % eventually do proper cleanup in erlfdb:wait timeout code. + ?LOG_ERROR(#{ + what => spurious_future_ready, + ref => Ref + }), LogMsg = "~p : spurious erlfdb future ready message ~p", couch_log:error(LogMsg, [?MODULE, Ref]), {noreply, St}; @@ -199,31 +208,44 @@ flush_type_updated_messages(VSMax) -> end. -get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, Ratio) - when Ratio >= ?GET_JOBS_RANGE_RATIO -> - Filter = fun(JobId) -> maps:is_key(JobId, InactiveIdMap) end, - JobMap = couch_jobs_fdb:tx(JTx, fun(JTx1) -> - couch_jobs_fdb:get_jobs(JTx1, Type, Filter) - end), - maps:map(fun(JobId, _) -> - case maps:is_key(JobId, JobMap) of - true -> maps:get(JobId, JobMap); - false -> {null, not_found, not_found} - end - end, InactiveIdMap); - -get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, _) -> - couch_jobs_fdb:tx(JTx, fun(JTx1) -> - maps:map(fun(JobId, _) -> - Job = #{job => true, type => Type, id => JobId}, - case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of - {ok, Seq, State, Data} -> - {Seq, State, Data}; - {error, not_found} -> - {null, not_found, not_found} - end - end, InactiveIdMap) - end). +get_jobs(#st{} = St, Ids) when is_list(Ids) -> + #st{jtx = JTx, type = Type, batch_size = BatchSize} = St, + {Jobs, NewBatchSize} = get_jobs_iter(JTx, Type, Ids, BatchSize, #{}), + {Jobs, St#st{batch_size = NewBatchSize}}. + + +get_jobs_iter(_Jtx, _Type, [], BatchSize, #{} = Acc) -> + {Acc, BatchSize}; + +get_jobs_iter(JTx, Type, Ids, BatchSize, #{} = Acc0) -> + {BatchIds, RestIds} = case length(Ids) < BatchSize of + true -> {Ids, []}; + false -> lists:split(BatchSize, Ids) + end, + Result = try + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + lists:foldl(fun(JobId, #{} = Acc) -> + Job = #{job => true, type => Type, id => JobId}, + case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of + {ok, Seq, State, Data} -> + Acc#{JobId => {Seq, State, Data}}; + {error, not_found} -> + Acc#{JobId => {null, not_found, not_found}} + end + end, Acc0, BatchIds) + end) + catch + error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) -> + failed + end, + case Result of + #{} = AccF -> + NewBatchSize = BatchSize + batch_increment(), + get_jobs_iter(JTx, Type, RestIds, NewBatchSize, AccF); + failed -> + NewBatchSize = max(1, round(BatchSize * batch_factor())), + get_jobs_iter(JTx, Type, Ids, NewBatchSize, Acc0) + end. get_type_vs(#st{jtx = JTx, type = Type}) -> @@ -236,24 +258,48 @@ get_type_vs(#st{jtx = JTx, type = Type}) -> % and updated at least once since the given versionstamp. These are relatively % cheap to find as it's just a range read in the ?ACTIVITY subspace. % -get_active_since(#st{} = _St, not_found) -> - #{}; - -get_active_since(#st{jtx = JTx, type = Type, subs = Subs}, VS) -> - AllUpdated = couch_jobs_fdb:tx(JTx, fun(JTx1) -> - couch_jobs_fdb:get_active_since(JTx1, Type, VS) - end), - maps:map(fun(_JobId, Data) -> +get_active_since(#st{} = St, not_found) -> + {#{}, St}; + +get_active_since(#st{} = St, VS) -> + #st{jtx = JTx, type = Type, subs = Subs, batch_size = BatchSize} = St, + {Updated, NewBatchSize} = get_active_iter(JTx, Type, VS, BatchSize, #{}), + UpdatedSubs = maps:map(fun(_JobId, Data) -> {VS, running, Data} - end, maps:with(maps:keys(Subs), AllUpdated)). + end, maps:with(maps:keys(Subs), Updated)), + {UpdatedSubs, St#st{batch_size = NewBatchSize}}. + + +get_active_iter(JTx, Type, VS, BatchSize, #{} = Acc) -> + Opts = [{limit, BatchSize}], + Result = try + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_active_since(JTx1, Type, VS, Opts) + end) + catch + error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) -> + failed + end, + case Result of + {Updated, _FinalSeq} when map_size(Updated) < BatchSize -> + {maps:merge(Acc, Updated), BatchSize}; + {Updated, FinalSeq} when map_size(Updated) >= BatchSize -> + Acc1 = maps:merge(Acc, Updated), + NewBatchSize = BatchSize + batch_increment(), + NextSeq = fabric2_fdb:next_vs(FinalSeq), + get_active_iter(JTx, Type, NextSeq, NewBatchSize, Acc1); + failed -> + NewBatchSize = max(1, round(BatchSize * batch_factor())), + get_active_iter(JTx, Type, VS, NewBatchSize, Acc) + end. try_notify_subscribers(ActiveVS, #st{} = St) -> try notify_subscribers(ActiveVS, St) catch - error:{timeout, _} -> try_notify_subscribers(ActiveVS, St); - error:{erlfdb_error, 1031} -> try_notify_subscribers(ActiveVS, St) + error:{Tag, Err} when ?COUCH_JOBS_RETRYABLE(Tag, Err) -> + try_notify_subscribers(ActiveVS, St) end. @@ -263,14 +309,13 @@ notify_subscribers(_, #st{subs = Subs} = St) when map_size(Subs) =:= 0 -> notify_subscribers(ActiveVS, #st{} = St1) -> % First gather the easy (cheap) active jobs. Then with those out of way % inspect each job to get its state. - Active = get_active_since(St1, ActiveVS), - St2 = notify_job_ids(Active, St1), + {Active, St2} = get_active_since(St1, ActiveVS), + St3 = notify_job_ids(Active, St2), ActiveIds = maps:keys(Active), - Subs = St2#st.subs, - InactiveIdMap = maps:without(ActiveIds, Subs), - InactiveRatio = maps:size(InactiveIdMap) / maps:size(Subs), - Inactive = get_jobs(St2, InactiveIdMap, InactiveRatio), - notify_job_ids(Inactive, St2). + Subs = St3#st.subs, + InactiveIds = maps:keys(maps:without(ActiveIds, Subs)), + {Inactive, St4} = get_jobs(St3, InactiveIds), + notify_job_ids(Inactive, St4). notify_job_ids(#{} = Jobs, #st{type = Type} = St0) -> @@ -302,13 +347,25 @@ notify(Pid, Ref, Type, Id, State, Data) -> get_holdoff() -> - config:get_integer("couch_jobs", "type_monitor_holdoff_msec", + couch_jobs_util:get_non_neg_int(type_monitor_holdoff_msec, ?TYPE_MONITOR_HOLDOFF_DEFAULT). get_timeout() -> - Default = ?TYPE_MONITOR_TIMEOUT_DEFAULT, - case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of - "infinity" -> infinity; - Milliseconds -> list_to_integer(Milliseconds) - end. + couch_jobs_util:get_timeout(type_monitor_timeout_msec, + ?TYPE_MONITOR_TIMEOUT_DEFAULT). + + +init_batch_size() -> + couch_jobs_util:get_non_neg_int(notifier_init_batch_size, + ?INIT_BATCH_SIZE). + + +batch_increment() -> + couch_jobs_util:get_non_neg_int(notifier_batch_increment, + ?BATCH_INCREMENT). + + +batch_factor() -> + couch_jobs_util:get_float_0_1(notifier_batch_factor, + ?BATCH_FACTOR). |