summaryrefslogtreecommitdiff
path: root/src/ken/src/ken_server.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ken/src/ken_server.erl')
-rw-r--r--src/ken/src/ken_server.erl464
1 files changed, 252 insertions, 212 deletions
diff --git a/src/ken/src/ken_server.erl b/src/ken/src/ken_server.erl
index b33d01f35..9f869b379 100644
--- a/src/ken/src/ken_server.erl
+++ b/src/ken/src/ken_server.erl
@@ -32,8 +32,10 @@
-export([update_db_indexes/2]).
-record(job, {
- name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
- server, % Pid of either view group or search index
+ % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
+ name,
+ % Pid of either view group or search index
+ server,
worker_pid = nil,
seq = 0,
lru = erlang:monotonic_time()
@@ -78,11 +80,15 @@ remove(DbName) ->
add_all_shards(DbName) ->
try
Shards = mem3:shards(mem3:dbname(DbName)),
- lists:map(fun(Shard) ->
- rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
- end, Shards)
- catch error:database_does_not_exist ->
- ok
+ lists:map(
+ fun(Shard) ->
+ rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
+ end,
+ Shards
+ )
+ catch
+ error:database_does_not_exist ->
+ ok
end.
%% @doc Changes the configured value for a batch size.
@@ -124,66 +130,64 @@ terminate(_Reason, _State) ->
handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) ->
{reply, Old, State#state{batch_size = BS}, 0};
-
handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) ->
{reply, Old, State#state{delay = Delay}, 0};
-
handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) ->
{reply, Old, State#state{limit = Limit}, 0};
-
-handle_call({set_prune_interval, Interval}, _From , State) ->
+handle_call({set_prune_interval, Interval}, _From, State) ->
Old = State#state.prune_interval,
{reply, Old, State#state{prune_interval = Interval}, 0};
-
handle_call(Msg, From, State) ->
{stop, {unknown_call, Msg, From}, State}.
% Queues a DB to (maybe) have indexing jobs spawned.
handle_cast({add, DbName}, State) ->
case ets:insert_new(ken_pending, {DbName}) of
- true ->
- {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
- false ->
- {noreply, State, 0}
+ true ->
+ {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
+ false ->
+ {noreply, State, 0}
end;
-
handle_cast({remove, DbName}, State) ->
Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q),
ets:delete(ken_pending, DbName),
% Delete search index workers
- ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}),
+ ets:match_delete(ken_workers, #job{name = {DbName, '_', '_'}, _ = '_'}),
% Delete view index workers
- ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}),
+ ets:match_delete(ken_workers, #job{name = {DbName, '_'}, _ = '_'}),
% TODO kill off active jobs for this DB as well
{noreply, State#state{q = Q2}, 0};
-
handle_cast({resubmit, DbName}, State) ->
ets:delete(ken_resubmit, DbName),
handle_cast({add, DbName}, State);
-
% st index job names have 3 elements, 3rd being 'hastings'. See job record definition.
-handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) ->
+handle_cast({trigger_update, #job{name = {_, _, hastings}, server = GPid, seq = Seq} = Job}, State) ->
% hastings_index:await will trigger a hastings index update
- {Pid, _} = erlang:spawn_monitor(hastings_index, await,
- [GPid, Seq]),
+ {Pid, _} = erlang:spawn_monitor(
+ hastings_index,
+ await,
+ [GPid, Seq]
+ ),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
% search index job names have 3 elements. See job record definition.
-handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
+handle_cast({trigger_update, #job{name = {_, _, _}, server = GPid, seq = Seq} = Job}, State) ->
% dreyfus_index:await will trigger a search index update.
- {Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
- [GPid, Seq]),
+ {Pid, _} = erlang:spawn_monitor(
+ dreyfus_index,
+ await,
+ [GPid, Seq]
+ ),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
-handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
+handle_cast({trigger_update, #job{name = {_, _}, server = SrvPid, seq = Seq} = Job}, State) ->
% couch_index:get_state/2 will trigger a view group index update.
{Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
Now = erlang:monotonic_time(),
ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
{noreply, State, 0};
-
handle_cast(Msg, State) ->
{stop, {unknown_cast, Msg}, State}.
@@ -191,37 +195,33 @@ handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) ->
couch_log:error("ken_event_handler terminated: ~w", [Reason]),
erlang:send_after(5000, self(), start_event_handler),
{ok, State, 0};
-
handle_info(start_event_handler, State) ->
case ken_event_handler:start_link() of
- {ok, _Pid} ->
- ok;
- Error ->
- couch_log:error("ken_event_handler init: ~w", [Error]),
- erlang:send_after(5000, self(), start_event_handler)
+ {ok, _Pid} ->
+ ok;
+ Error ->
+ couch_log:error("ken_event_handler init: ~w", [Error]),
+ erlang:send_after(5000, self(), start_event_handler)
end,
{noreply, State, 0};
-
handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
Now = erlang:monotonic_time(),
Interval = erlang:convert_time_unit(
- State#state.delay, millisecond, native),
+ State#state.delay, millisecond, native
+ ),
case Now - Last > Interval of
- true ->
- NewState = prune_worker_table(State);
- _ ->
- NewState = State
+ true ->
+ NewState = prune_worker_table(State);
+ _ ->
+ NewState = State
end,
{noreply, maybe_start_next_queued_job(NewState), I};
-
-handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) ->
+handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name, Pid}} = St) ->
maybe_resubmit(Name, Reason),
- {noreply, St#state{dbworker=nil}, 0};
-
+ {noreply, St#state{dbworker = nil}, 0};
handle_info({'DOWN', _, _, Pid, Reason}, State) ->
debrief_worker(Pid, Reason, State),
{noreply, State, 0};
-
handle_info(Msg, State) ->
{stop, {unknown_info, Msg}, State}.
@@ -230,31 +230,32 @@ code_change(_OldVsn, State, _Extra) ->
%% private functions
-maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) ->
+maybe_start_next_queued_job(#state{dbworker = {_, _}} = State) ->
State;
-maybe_start_next_queued_job(#state{q=Q} = State) ->
+maybe_start_next_queued_job(#state{q = Q} = State) ->
IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
BatchChannels = list_to_integer(config("batch_channels", "20")),
TotalChannels = IncrementalChannels + BatchChannels,
case queue:out(Q) of
- {{value, DbName}, Q2} ->
- case skip_job(DbName) of
- true ->
- % job is either being resubmitted or ignored, skip it
- ets:delete(ken_pending, DbName),
- maybe_start_next_queued_job(State#state{q = Q2});
- false ->
- case get_active_count() of A when A < TotalChannels ->
- Args = [DbName, State],
- {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
- ets:delete(ken_pending, DbName),
- State#state{dbworker = {DbName,Pid}, q = Q2};
- _ ->
- State#state{q = queue:in_r(DbName, Q2)}
- end
- end;
- {empty, Q} ->
- State
+ {{value, DbName}, Q2} ->
+ case skip_job(DbName) of
+ true ->
+ % job is either being resubmitted or ignored, skip it
+ ets:delete(ken_pending, DbName),
+ maybe_start_next_queued_job(State#state{q = Q2});
+ false ->
+ case get_active_count() of
+ A when A < TotalChannels ->
+ Args = [DbName, State],
+ {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
+ ets:delete(ken_pending, DbName),
+ State#state{dbworker = {DbName, Pid}, q = Q2};
+ _ ->
+ State#state{q = queue:in_r(DbName, Q2)}
+ end
+ end;
+ {empty, Q} ->
+ State
end.
skip_job(DbName) ->
@@ -262,28 +263,35 @@ skip_job(DbName) ->
ignore_db(DbName) ->
case config:get("ken.ignore", ?b2l(DbName), false) of
- "true" ->
- true;
- _ ->
- false
+ "true" ->
+ true;
+ _ ->
+ false
end.
get_active_count() ->
- MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}],
+ MatchSpec = [{#job{worker_pid = '$1', _ = '_'}, [{is_pid, '$1'}], [true]}],
ets:select_count(ken_workers, MatchSpec).
% If any indexing job fails, resubmit requests for all indexes.
update_db_indexes(Name, State) ->
{ok, DDocs} = design_docs(Name),
RandomSorted = lists:sort([{rand:uniform(), D} || D <- DDocs]),
- Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
- JsonDDoc = couch_doc:from_json_obj(DDoc),
- case update_ddoc_indexes(Name, JsonDDoc, State) of
- ok -> Acc;
- _ -> true
- end
- end, false, RandomSorted),
- if Resubmit -> exit(resubmit); true -> ok end.
+ Resubmit = lists:foldl(
+ fun({_, DDoc}, Acc) ->
+ JsonDDoc = couch_doc:from_json_obj(DDoc),
+ case update_ddoc_indexes(Name, JsonDDoc, State) of
+ ok -> Acc;
+ _ -> true
+ end
+ end,
+ false,
+ RandomSorted
+ ),
+ if
+ Resubmit -> exit(resubmit);
+ true -> ok
+ end.
design_docs(Name) ->
try
@@ -293,27 +301,32 @@ design_docs(Name) ->
Else ->
Else
end
- catch error:database_does_not_exist ->
- {ok, []}
+ catch
+ error:database_does_not_exist ->
+ {ok, []}
end.
% Returns an error if any job creation fails.
-update_ddoc_indexes(Name, #doc{}=Doc, State) ->
- {ok, Db} = case couch_db:open_int(Name, []) of
- {ok, _} = Resp -> Resp;
- Else -> exit(Else)
- end,
+update_ddoc_indexes(Name, #doc{} = Doc, State) ->
+ {ok, Db} =
+ case couch_db:open_int(Name, []) of
+ {ok, _} = Resp -> Resp;
+ Else -> exit(Else)
+ end,
Seq = couch_db:get_update_seq(Db),
couch_db:close(Db),
- ViewUpdated = case should_update(Doc, <<"views">>) of true ->
- try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
- {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
- catch _:_ ->
- ok
- end;
- false ->
- ok
- end,
+ ViewUpdated =
+ case should_update(Doc, <<"views">>) of
+ true ->
+ try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
+ {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
+ catch
+ _:_ ->
+ ok
+ end;
+ false ->
+ ok
+ end,
SearchUpdated = search_updated(Name, Doc, Seq, State),
STUpdated = st_updated(Name, Doc, Seq, State),
case {ViewUpdated, SearchUpdated, STUpdated} of
@@ -323,14 +336,16 @@ update_ddoc_indexes(Name, #doc{}=Doc, State) ->
-ifdef(HAVE_DREYFUS).
search_updated(Name, Doc, Seq, State) ->
- case should_update(Doc, <<"indexes">>) of true ->
- try dreyfus_index:design_doc_to_indexes(Doc) of
- SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
- catch _:_ ->
+ case should_update(Doc, <<"indexes">>) of
+ true ->
+ try dreyfus_index:design_doc_to_indexes(Doc) of
+ SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
+ catch
+ _:_ ->
+ ok
+ end;
+ false ->
ok
- end;
- false ->
- ok
end.
-else.
search_updated(_Name, _Doc, _Seq, _State) ->
@@ -339,22 +354,23 @@ search_updated(_Name, _Doc, _Seq, _State) ->
-ifdef(HAVE_HASTINGS).
st_updated(Name, Doc, Seq, State) ->
- case should_update(Doc, <<"st_indexes">>) of true ->
- try
- hastings_index:design_doc_to_indexes(Doc) of
- STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
- catch _:_ ->
+ case should_update(Doc, <<"st_indexes">>) of
+ true ->
+ try hastings_index:design_doc_to_indexes(Doc) of
+ STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
+ catch
+ _:_ ->
+ ok
+ end;
+ false ->
ok
- end;
- false ->
- ok
end.
-else.
st_updated(_Name, _Doc, _Seq, _State) ->
ok.
-endif.
-should_update(#doc{body={Props}}, IndexType) ->
+should_update(#doc{body = {Props}}, IndexType) ->
case couch_util:get_value(<<"autoupdate">>, Props) of
false ->
false;
@@ -373,47 +389,66 @@ update_ddoc_views(Name, MRSt, Seq, State) ->
Language = couch_mrview_index:get(language, MRSt),
Allowed = lists:member(Language, allowed_languages()),
Views = couch_mrview_index:get(views, MRSt),
- if Allowed andalso Views =/= [] ->
- {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
- GroupName = couch_mrview_index:get(idx_name, MRSt),
- maybe_start_job({Name, GroupName}, Pid, Seq, State);
- true -> ok end.
+ if
+ Allowed andalso Views =/= [] ->
+ {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
+ GroupName = couch_mrview_index:get(idx_name, MRSt),
+ maybe_start_job({Name, GroupName}, Pid, Seq, State);
+ true ->
+ ok
+ end.
-ifdef(HAVE_DREYFUS).
update_ddoc_search_indexes(DbName, Indexes, Seq, State) ->
- if Indexes =/= [] ->
- % Spawn a job for each search index in the ddoc
- lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) ->
- case dreyfus_index_manager:get_index(DbName, Index) of
- {ok, Pid} ->
- case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
- resubmit -> resubmit;
- _ -> Acc
- end;
- _ ->
- % If any job fails, retry the db.
- resubmit
- end end, ok, Indexes);
- true -> ok end.
+ if
+ Indexes =/= [] ->
+ % Spawn a job for each search index in the ddoc
+ lists:foldl(
+ fun(#index{name = IName, ddoc_id = DDocName} = Index, Acc) ->
+ case dreyfus_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
+ resubmit -> resubmit;
+ _ -> Acc
+ end;
+ _ ->
+ % If any job fails, retry the db.
+ resubmit
+ end
+ end,
+ ok,
+ Indexes
+ );
+ true ->
+ ok
+ end.
-endif.
-ifdef(HAVE_HASTINGS).
update_ddoc_st_indexes(DbName, Indexes, Seq, State) ->
- if Indexes =/= [] ->
- % The record name in hastings is #h_idx rather than #index as it is for dreyfus
- % Spawn a job for each spatial index in the ddoc
- lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) ->
- case hastings_index_manager:get_index(DbName, Index) of
- {ok, Pid} ->
- case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
- resubmit -> resubmit;
- _ -> Acc
- end;
- _ ->
- % If any job fails, retry the db.
- resubmit
- end end, ok, Indexes);
- true -> ok end.
+ if
+ Indexes =/= [] ->
+ % The record name in hastings is #h_idx rather than #index as it is for dreyfus
+ % Spawn a job for each spatial index in the ddoc
+ lists:foldl(
+ fun(#h_idx{ddoc_id = DDocName} = Index, Acc) ->
+ case hastings_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
+ resubmit -> resubmit;
+ _ -> Acc
+ end;
+ _ ->
+ % If any job fails, retry the db.
+ resubmit
+ end
+ end,
+ ok,
+ Indexes
+ );
+ true ->
+ ok
+ end.
-endif.
should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
@@ -424,48 +459,49 @@ should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
A = get_active_count(),
#state{delay = Delay, batch_size = BS} = State,
case ets:lookup(ken_workers, Name) of
- [] ->
- if
- A < BatchChannels ->
- true;
- A < TotalChannels ->
- case Name of
- % st_index name has three elements
- {_, _, hastings} ->
- {ok, CurrentSeq} = hastings_index:await(Pid, 0),
- (Seq - CurrentSeq) < Threshold;
- % View name has two elements.
- {_,_} ->
- % Since seq is 0, couch_index:get_state/2 won't
- % spawn an index update.
- {ok, MRSt} = couch_index:get_state(Pid, 0),
- CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
- (Seq - CurrentSeq) < Threshold;
- % Search name has three elements.
- {_,_,_} ->
- {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
- (Seq - CurrentSeq) < Threshold;
- _ -> % Should never happen, but if it does, ignore.
- false
+ [] ->
+ if
+ A < BatchChannels ->
+ true;
+ A < TotalChannels ->
+ case Name of
+ % st_index name has three elements
+ {_, _, hastings} ->
+ {ok, CurrentSeq} = hastings_index:await(Pid, 0),
+ (Seq - CurrentSeq) < Threshold;
+ % View name has two elements.
+ {_, _} ->
+ % Since seq is 0, couch_index:get_state/2 won't
+ % spawn an index update.
+ {ok, MRSt} = couch_index:get_state(Pid, 0),
+ CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
+ (Seq - CurrentSeq) < Threshold;
+ % Search name has three elements.
+ {_, _, _} ->
+ {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
+ (Seq - CurrentSeq) < Threshold;
+ % Should never happen, but if it does, ignore.
+ _ ->
+ false
end;
- true ->
- false
- end;
- [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
- Now = erlang:monotonic_time(),
- DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond),
- if
- A < BatchChannels, (Seq - OldSeq) >= BS ->
- true;
- A < BatchChannels, DeltaT > Delay ->
- true;
- A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
- true;
- true ->
- false
- end;
- _ ->
- false
+ true ->
+ false
+ end;
+ [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
+ Now = erlang:monotonic_time(),
+ DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond),
+ if
+ A < BatchChannels, (Seq - OldSeq) >= BS ->
+ true;
+ A < BatchChannels, DeltaT > Delay ->
+ true;
+ A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
+ true;
+ true ->
+ false
+ end;
+ _ ->
+ false
end.
maybe_start_job(JobName, IndexPid, Seq, State) ->
@@ -475,24 +511,25 @@ maybe_start_job(JobName, IndexPid, Seq, State) ->
seq = Seq
},
case should_start_job(Job, State) of
- true ->
- gen_server:cast(?MODULE, {trigger_update, Job});
- false ->
- resubmit
+ true ->
+ gen_server:cast(?MODULE, {trigger_update, Job});
+ false ->
+ resubmit
end.
debrief_worker(Pid, Reason, _State) ->
- case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of
- [#job{name = Name} = Job] ->
- case Name of
- {DbName,_} ->
- maybe_resubmit(DbName, Reason);
- {DbName,_,_} ->
- maybe_resubmit(DbName, Reason)
- end,
- ets:insert(ken_workers, Job#job{worker_pid = nil});
- [] -> % should never happen, but if it does, ignore
- ok
+ case ets:match_object(ken_workers, #job{worker_pid = Pid, _ = '_'}) of
+ [#job{name = Name} = Job] ->
+ case Name of
+ {DbName, _} ->
+ maybe_resubmit(DbName, Reason);
+ {DbName, _, _} ->
+ maybe_resubmit(DbName, Reason)
+ end,
+ ets:insert(ken_workers, Job#job{worker_pid = nil});
+ % should never happen, but if it does, ignore
+ [] ->
+ ok
end.
maybe_resubmit(_DbName, normal) ->
@@ -519,14 +556,15 @@ prune_worker_table(State) ->
Delay = erlang:convert_time_unit(State#state.delay, millisecond, native),
C = erlang:monotonic_time() - Delay,
%% fun(#job{worker_pid=nil, lru=A) when A < C -> true end
- MatchHead = #job{worker_pid=nil, lru='$1', _='_'},
+ MatchHead = #job{worker_pid = nil, lru = '$1', _ = '_'},
Guard = {'<', '$1', C},
ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
State#state{pruned_last = erlang:monotonic_time()}.
allowed_languages() ->
- Config = couch_proc_manager:get_servers_from_env("COUCHDB_QUERY_SERVER_") ++
- couch_proc_manager:get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_"),
+ Config =
+ couch_proc_manager:get_servers_from_env("COUCHDB_QUERY_SERVER_") ++
+ couch_proc_manager:get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_"),
Allowed = [list_to_binary(string:to_lower(Lang)) || {Lang, _Cmd} <- Config],
[<<"query">> | Allowed].
@@ -536,8 +574,6 @@ config(Key, Default) ->
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-
-
prune_old_entries_test() ->
{
setup,
@@ -548,15 +584,19 @@ prune_old_entries_test() ->
catch ets:delete(ken_workers)
end,
?_test(begin
- lists:foreach(fun(Idx) ->
- ets:insert(ken_workers, #job{name=Idx}),
- timer:sleep(100)
- end, lists:seq(1, 3)),
- prune_worker_table(#state{delay=250}),
+ lists:foreach(
+ fun(Idx) ->
+ ets:insert(ken_workers, #job{name = Idx}),
+ timer:sleep(100)
+ end,
+ lists:seq(1, 3)
+ ),
+ prune_worker_table(#state{delay = 250}),
?assertEqual(
[2, 3],
lists:usort(
- [N || #job{name = N} <- ets:tab2list(ken_workers)])
+ [N || #job{name = N} <- ets:tab2list(ken_workers)]
+ )
),
ok
end)