diff options
author | Robert Newson <rnewson@apache.org> | 2019-02-06 20:07:46 +0000 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2019-02-07 18:16:46 +0000 |
commit | 390b91949220366a0c64aa74bffb46fa0b22c4b6 (patch) | |
tree | c364f9a6703841b3b0a4b34327d3bcdf820cb69d | |
download | couchdb-390b91949220366a0c64aa74bffb46fa0b22c4b6.tar.gz |
Import ken
-rw-r--r-- | README.md | 12 | ||||
-rw-r--r-- | rebar.config.script | 26 | ||||
-rw-r--r-- | src/ken.app.src.script | 38 | ||||
-rw-r--r-- | src/ken.erl | 29 | ||||
-rw-r--r-- | src/ken_app.erl | 28 | ||||
-rw-r--r-- | src/ken_event_handler.erl | 56 | ||||
-rw-r--r-- | src/ken_server.erl | 524 | ||||
-rw-r--r-- | src/ken_sup.erl | 33 | ||||
-rw-r--r-- | test/config.ini | 2 | ||||
-rw-r--r-- | test/ken_server_test.erl | 99 |
10 files changed, 847 insertions, 0 deletions
diff --git a/README.md b/README.md new file mode 100644 index 000000000..a5a657611 --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +ken +=== + +Ken builds views and search indexes. Automatically. + +#### Overview + +When the couch\_db\_update event is triggered with an `updated` event, ken will spawn indexing jobs for view groups and search indexes (one job per view group shard or search index shard). If a `deleted` event is triggered, all jobs associated with the corresponding database shard will be removed. + +#### Testing + +Testing for ken expected to be executed from the top level `couchdb` repo as a part of `make check` run. The isolated ken test could be ran as `rebar eunit apps=ken verbose=1` from the `couchdb`'s root directory. diff --git a/rebar.config.script b/rebar.config.script new file mode 100644 index 000000000..4570bfc1b --- /dev/null +++ b/rebar.config.script @@ -0,0 +1,26 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}. +HaveHastings = code:lib_dir(hastings) /= {error, bad_name}. + +CurrOpts = case lists:keyfind(erl_opts, 1, CONFIG) of + {erl_opts, Opts} -> Opts; + false -> [] +end, + +NewOpts = + if HaveDreyfus -> [{d, 'HAVE_DREYFUS'}]; true -> [] end ++ + if HaveHastings -> [{d, 'HAVE_HASTINGS'}]; true -> [] end ++ + [{i, "../"}] ++ CurrOpts. + +lists:keystore(erl_opts, 1, CONFIG, {erl_opts, NewOpts}). diff --git a/src/ken.app.src.script b/src/ken.app.src.script new file mode 100644 index 000000000..dcf4a23d1 --- /dev/null +++ b/src/ken.app.src.script @@ -0,0 +1,38 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}. +HaveHastings = code:lib_dir(hastings) /= {error, bad_name}. + +BaseApplications = [ + kernel, + stdlib, + couch_log, + couch_event, + couch, + config +]. + +Applications = + if HaveDreyfus -> [dreyfus]; true -> [] end ++ + if HaveHastings -> [hastings]; true -> [] end ++ + BaseApplications. + +{application, ken, + [ + {description, ""}, + {vsn, git}, + {registered, []}, + {applications, Applications}, + {mod, { ken_app, []}}, + {env, []} + ]}. diff --git a/src/ken.erl b/src/ken.erl new file mode 100644 index 000000000..87a724ba1 --- /dev/null +++ b/src/ken.erl @@ -0,0 +1,29 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ken). + +-export([add/1]). +-export([remove/1]). +-export([add_all_shards/1]). + +% Add a database shard to be indexed. +add(DbName) -> + ken_server:add(DbName). + +% Remove all pending jobs for a database shard. +remove(DbName) -> + ken_server:remove(DbName). + +% Add all shards for a database to be indexed. +add_all_shards(DbName) -> + ken_server:add_all_shards(DbName). diff --git a/src/ken_app.erl b/src/ken_app.erl new file mode 100644 index 000000000..15f235d42 --- /dev/null +++ b/src/ken_app.erl @@ -0,0 +1,28 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ken_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + ken_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/ken_event_handler.erl b/src/ken_event_handler.erl new file mode 100644 index 000000000..8f158f425 --- /dev/null +++ b/src/ken_event_handler.erl @@ -0,0 +1,56 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ken_event_handler). +-behaviour(couch_event_listener). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_event/3, + handle_cast/2, + handle_info/2 +]). + + +start_link() -> + couch_event_listener:start_link(?MODULE, nil, [all_dbs]). + +%% couch_event_listener callbacks + +init(_) -> + {ok, nil}. + +terminate(_Reason, _State) -> + ok. + +handle_event(DbName, updated, State) -> + ken:add(DbName), + {ok, State}; +handle_event(DbName, deleted, State) -> + ken:remove(DbName), + {ok, State}; +handle_event(DbName, ddoc_updated, State) -> + ken:add_all_shards(DbName), + {ok, State}; +handle_event(_DbName, _Event, State) -> + {ok, State}. + +handle_cast(_Msg, State) -> + {ok, State}. + +handle_info(_Msg, State) -> + {ok, State}. diff --git a/src/ken_server.erl b/src/ken_server.erl new file mode 100644 index 000000000..25da5ac06 --- /dev/null +++ b/src/ken_server.erl @@ -0,0 +1,524 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ken_server). + +% gen_server boilerplate +-behaviour(gen_server). +-vsn(1). +-export([init/1, terminate/2]). +-export([handle_call/3, handle_cast/2, handle_info/2, code_change/3]). + +% Public interface +-export([start_link/0]). +-export([add/1]). +-export([remove/1]). +-export([add_all_shards/1]). +-export([set_batch_size/1]). +-export([set_delay/1]). +-export([set_limit/1]). +-export([set_prune_interval/1]). + +% exports for spawn +-export([update_db_indexes/2]). + +-record(job, { + name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search. + server, % Pid of either view group or search index + worker_pid = nil, + seq = 0, + lru = now() +}). + +-record(state, { + q = queue:new(), + dbworker = nil, + limit = 20, + delay = 5000, + batch_size = 1, + prune_interval = 60000, + pruned_last +}). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +-ifdef(HAVE_DREYFUS). +-include_lib("dreyfus/include/dreyfus.hrl"). +-endif. + +-ifdef(HAVE_HASTINGS). +-include_lib("hastings/src/hastings.hrl"). +-endif. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% @doc Adds a database shard to be indexed +-spec add(binary()) -> ok. +add(DbName) -> + gen_server:cast(?MODULE, {add, DbName}). + +%% @doc Removes all the pending jobs for a database shard. +-spec remove(binary()) -> ok. +remove(DbName) -> + gen_server:cast(?MODULE, {remove, DbName}). + +%% @doc Adds all the shards for a database to be indexed. +-spec add_all_shards(binary()) -> ok. +add_all_shards(DbName) -> + try + Shards = mem3:shards(mem3:dbname(DbName)), + lists:map(fun(Shard) -> + rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]}) + end, Shards) + catch error:database_does_not_exist -> + ok + end. + +%% @doc Changes the configured value for a batch size. +%% Returns previous value. +-spec set_batch_size(pos_integer()) -> pos_integer(). +set_batch_size(BS) when is_integer(BS), BS > 0 -> + gen_server:call(?MODULE, {set_batch_size, BS}). + +%% @doc Changes the configured value for a delay between batches. +%% Returns previous value. +-spec set_delay(non_neg_integer()) -> non_neg_integer(). +set_delay(Delay) when is_integer(Delay), Delay >= 0 -> + gen_server:call(?MODULE, {set_delay, Delay}). + +%% @doc Changes the configured value for a limit. +%% Returns previous value. +-spec set_limit(pos_integer()) -> pos_integer(). +set_limit(Limit) when is_integer(Limit), Limit > 0 -> + gen_server:call(?MODULE, {set_limit, Limit}). + +%% @doc Changes the configured value for a prune interval. +%% Returns previous value. +-spec set_prune_interval(pos_integer()) -> pos_integer(). +set_prune_interval(Interval) when is_integer(Interval), Interval > 1000 -> + gen_server:call(?MODULE, {set_prune_interval, Interval}). + +%% gen_server callbacks + +init(_) -> + erlang:send(self(), start_event_handler), + ets:new(ken_pending, [named_table]), + ets:new(ken_resubmit, [named_table]), + ets:new(ken_workers, [named_table, public, {keypos, #job.name}]), + Limit = list_to_integer(config("limit", "20")), + {ok, #state{pruned_last = now(), limit = Limit}}. + +terminate(_Reason, _State) -> + ok. + +handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) -> + {reply, Old, State#state{batch_size = BS}, 0}; + +handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) -> + {reply, Old, State#state{delay = Delay}, 0}; + +handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) -> + {reply, Old, State#state{limit = Limit}, 0}; + +handle_call({set_prune_interval, Interval}, _From , State) -> + Old = State#state.prune_interval, + {reply, Old, State#state{prune_interval = Interval}, 0}; + +handle_call(Msg, From, State) -> + {stop, {unknown_call, Msg, From}, State}. + +% Queues a DB to (maybe) have indexing jobs spawned. +handle_cast({add, DbName}, State) -> + case ets:insert_new(ken_pending, {DbName}) of + true -> + {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0}; + false -> + {noreply, State, 0} + end; + +handle_cast({remove, DbName}, State) -> + Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q), + ets:delete(ken_pending, DbName), + % Delete search index workers + ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}), + % Delete view index workers + ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}), + % TODO kill off active jobs for this DB as well + {noreply, State#state{q = Q2}, 0}; + +handle_cast({resubmit, DbName}, State) -> + ets:delete(ken_resubmit, DbName), + handle_cast({add, DbName}, State); + +% st index job names have 3 elements, 3rd being 'hastings'. See job record definition. +handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) -> + % hastings_index:await will trigger a hastings index update + {Pid, _} = erlang:spawn_monitor(hastings_index, await, + [GPid, Seq]), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}), + {noreply, State, 0}; +% search index job names have 3 elements. See job record definition. +handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) -> + % dreyfus_index:await will trigger a search index update. + {Pid, _} = erlang:spawn_monitor(dreyfus_index, await, + [GPid, Seq]), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}), + {noreply, State, 0}; +handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) -> + % couch_index:get_state/2 will trigger a view group index update. + {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]), + ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}), + {noreply, State, 0}; + +handle_cast(Msg, State) -> + {stop, {unknown_cast, Msg}, State}. + +handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) -> + couch_log:error("ken_event_handler terminated: ~w", [Reason]), + erlang:send_after(5000, self(), start_event_handler), + {ok, State, 0}; + +handle_info(start_event_handler, State) -> + case ken_event_handler:start_link() of + {ok, _Pid} -> + ok; + Error -> + couch_log:error("ken_event_handler init: ~w", [Error]), + erlang:send_after(5000, self(), start_event_handler) + end, + {noreply, State, 0}; + +handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) -> + case timer:now_diff(now(), Last) of + X when X > (1000 * I) -> + NewState = prune_worker_table(State); + _ -> + NewState = State + end, + {noreply, maybe_start_next_queued_job(NewState), I}; + +handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) -> + maybe_resubmit(Name, Reason), + {noreply, St#state{dbworker=nil}, 0}; + +handle_info({'DOWN', _, _, Pid, Reason}, State) -> + debrief_worker(Pid, Reason, State), + {noreply, State, 0}; + +handle_info(Msg, State) -> + {stop, {unknown_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% private functions + +maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) -> + State; +maybe_start_next_queued_job(#state{q=Q} = State) -> + IncrementalChannels = list_to_integer(config("incremental_channels", "80")), + BatchChannels = list_to_integer(config("batch_channels", "20")), + TotalChannels = IncrementalChannels + BatchChannels, + case queue:out(Q) of + {{value, DbName}, Q2} -> + case skip_job(DbName) of + true -> + % job is either being resubmitted or ignored, skip it + ets:delete(ken_pending, DbName), + maybe_start_next_queued_job(State#state{q = Q2}); + false -> + case get_active_count() of A when A < TotalChannels -> + Args = [DbName, State], + {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args), + ets:delete(ken_pending, DbName), + State#state{dbworker = {DbName,Pid}, q = Q2}; + _ -> + State#state{q = queue:in_r(DbName, Q2)} + end + end; + {empty, Q} -> + State + end. + +skip_job(DbName) -> + ets:member(ken_resubmit, DbName) orelse ignore_db(DbName). + +ignore_db(DbName) -> + case config:get("ken.ignore", ?b2l(DbName), false) of + "true" -> + true; + _ -> + false + end. + +get_active_count() -> + MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}], + ets:select_count(ken_workers, MatchSpec). + +% If any indexing job fails, resubmit requests for all indexes. +update_db_indexes(Name, State) -> + {ok, DDocs} = design_docs(Name), + random:seed(now()), + RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]), + Resubmit = lists:foldl(fun({_, DDoc}, Acc) -> + JsonDDoc = couch_doc:from_json_obj(DDoc), + case update_ddoc_indexes(Name, JsonDDoc, State) of + ok -> Acc; + _ -> true + end + end, false, RandomSorted), + if Resubmit -> exit(resubmit); true -> ok end. + +design_docs(Name) -> + try + case fabric:design_docs(mem3:dbname(Name)) of + {error, {maintenance_mode, _, _Node}} -> + {ok, []}; + Else -> + Else + end + catch error:database_does_not_exist -> + {ok, []} + end. + +% Returns an error if any job creation fails. +update_ddoc_indexes(Name, #doc{}=Doc, State) -> + {ok, Db} = case couch_db:open_int(Name, []) of + {ok, _} = Resp -> Resp; + Else -> exit(Else) + end, + Seq = couch_db:get_update_seq(Db), + couch_db:close(Db), + ViewUpdated = case should_update(Doc, <<"views">>) of true -> + try couch_mrview_util:ddoc_to_mrst(Name, Doc) of + {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State) + catch _:_ -> + ok + end; + false -> + ok + end, + SearchUpdated = search_updated(Doc, Seq, State), + STUpdated = st_updated(Doc, Seq, State), + case {ViewUpdated, SearchUpdated, STUpdated} of + {ok, ok, ok} -> ok; + _ -> resubmit + end. + +-ifdef(HAS_DREYFUS). +search_updated(Doc, Seq, State) -> + case should_update(Doc, <<"indexes">>) of true -> + try dreyfus_index:design_doc_to_indexes(Doc) of + SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State) + catch _:_ -> + ok + end; + false -> + ok + end. +-else. +search_updated(_Doc, _Seq, _State) -> + ok. +-endif. + +-ifdef(HAS_HASTINGS). +st_updated(Doc, Seq, State) -> + case should_update(Doc, <<"st_indexes">>) of true -> + try + hastings_index:design_doc_to_indexes(Doc) of + STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State) + catch _:_ -> + ok + end; + false -> + ok + end. +-else. +st_updated(_Doc, _Seq, _State) -> + ok. +-endif. + +should_update(#doc{body={Props}}, IndexType) -> + case couch_util:get_value(<<"autoupdate">>, Props) of + false -> + false; + {AUProps} -> + case couch_util:get_value(IndexType, AUProps) of + false -> + false; + _ -> + true + end; + _ -> + true + end. + +update_ddoc_views(Name, MRSt, Seq, State) -> + Language = couch_mrview_index:get(language, MRSt), + Allowed = lists:member(Language, allowed_languages()), + Views = couch_mrview_index:get(views, MRSt), + if Allowed andalso Views =/= [] -> + {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt), + GroupName = couch_mrview_index:get(idx_name, MRSt), + maybe_start_job({Name, GroupName}, Pid, Seq, State); + true -> ok end. + +-ifdef(HAVE_DREYFUS). +update_ddoc_search_indexes(DbName, Indexes, Seq, State) -> + if Indexes =/= [] -> + % Spawn a job for each search index in the ddoc + lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) -> + case dreyfus_index_manager:get_index(DbName, Index) of + {ok, Pid} -> + case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of + resubmit -> resubmit; + _ -> Acc + end; + _ -> + % If any job fails, retry the db. + resubmit + end end, ok, Indexes); + true -> ok end. +-endif. + +-ifdef(HAVE_HASTINGS). +update_ddoc_st_indexes(DbName, Indexes, Seq, State) -> + if Indexes =/= [] -> + % The record name in hastings is #h_idx rather than #index as it is for dreyfus + % Spawn a job for each spatial index in the ddoc + lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) -> + case hastings_index_manager:get_index(DbName, Index) of + {ok, Pid} -> + case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of + resubmit -> resubmit; + _ -> Acc + end; + _ -> + % If any job fails, retry the db. + resubmit + end end, ok, Indexes); + true -> ok end. +-endif. + +should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) -> + Threshold = list_to_integer(config("max_incremental_updates", "1000")), + IncrementalChannels = list_to_integer(config("incremental_channels", "80")), + BatchChannels = list_to_integer(config("batch_channels", "20")), + TotalChannels = IncrementalChannels + BatchChannels, + A = get_active_count(), + #state{delay = Delay, batch_size = BS} = State, + case ets:lookup(ken_workers, Name) of + [] -> + if + A < BatchChannels -> + true; + A < TotalChannels -> + case Name of + % st_index name has three elements + {_, _, hastings} -> + {ok, CurrentSeq} = hastings_index:await(Pid, 0), + (Seq - CurrentSeq) < Threshold; + % View name has two elements. + {_,_} -> + % Since seq is 0, couch_index:get_state/2 won't + % spawn an index update. + {ok, MRSt} = couch_index:get_state(Pid, 0), + CurrentSeq = couch_mrview_index:get(update_seq, MRSt), + (Seq - CurrentSeq) < Threshold; + % Search name has three elements. + {_,_,_} -> + {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0), + (Seq - CurrentSeq) < Threshold; + _ -> % Should never happen, but if it does, ignore. + false + end; + true -> + false + end; + [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] -> + DeltaT = timer:now_diff(now(), LRU) / 1000, + if + A < BatchChannels, (Seq - OldSeq) >= BS -> + true; + A < BatchChannels, DeltaT > Delay -> + true; + A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay -> + true; + true -> + false + end; + _ -> + false + end. + +maybe_start_job(JobName, IndexPid, Seq, State) -> + Job = #job{ + name = JobName, + server = IndexPid, + seq = Seq + }, + case should_start_job(Job, State) of + true -> + gen_server:cast(?MODULE, {trigger_update, Job}); + false -> + resubmit + end. + +debrief_worker(Pid, Reason, _State) -> + case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of + [#job{name = Name} = Job] -> + case Name of + {DbName,_} -> + maybe_resubmit(DbName, Reason); + {DbName,_,_} -> + maybe_resubmit(DbName, Reason) + end, + ets:insert(ken_workers, Job#job{worker_pid = nil}); + [] -> % should never happen, but if it does, ignore + ok + end. + +maybe_resubmit(_DbName, normal) -> + ok; +maybe_resubmit(_DbName, {database_does_not_exist, _}) -> + ok; +maybe_resubmit(_DbName, {not_found, no_db_file}) -> + ok; +maybe_resubmit(DbName, resubmit) -> + resubmit(60000, DbName); +maybe_resubmit(DbName, _) -> + resubmit(5000, DbName). + +resubmit(Delay, DbName) -> + case ets:insert_new(ken_resubmit, {DbName}) of + true -> + erlang:send_after(Delay, ?MODULE, {'$gen_cast', {resubmit, DbName}}); + false -> + ok + end. + +prune_worker_table(State) -> + {A, B, _} = now(), + C = (1000000 * A) + B - 0.001 * State#state.delay, + MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'}, + Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C}, + ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]), + State#state{pruned_last = now()}. + +allowed_languages() -> + Config = config:get("query_servers") ++ config:get("native_query_servers"), + [list_to_binary(Lang) || {Lang, _Cmd} <- Config]. + +config(Key, Default) -> + config:get("ken", Key, Default). diff --git a/src/ken_sup.erl b/src/ken_sup.erl new file mode 100644 index 000000000..fd08cfd11 --- /dev/null +++ b/src/ken_sup.erl @@ -0,0 +1,33 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ken_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% supervisor callbacks + +init([]) -> + {ok, { {one_for_one, 5, 10}, [?CHILD(ken_server, worker)]} }. + diff --git a/test/config.ini b/test/config.ini new file mode 100644 index 000000000..a28eae4c0 --- /dev/null +++ b/test/config.ini @@ -0,0 +1,2 @@ +[ken] +limit = 42 diff --git a/test/ken_server_test.erl b/test/ken_server_test.erl new file mode 100644 index 000000000..1d2af7a4f --- /dev/null +++ b/test/ken_server_test.erl @@ -0,0 +1,99 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ken_server_test). + +-compile([export_all]). + +-include_lib("eunit/include/eunit.hrl"). + +%% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000 +default_test_() -> + {inorder, {setup, + fun setup_default/0, + fun teardown/1, + [ + set_builder("returns default", set_limit, 12, 20), + set_builder("keeps set", set_limit, 6, 12), + set_builder("returns default", set_batch_size, 3, 1), + set_builder("keeps set", set_batch_size, 6, 3), + set_builder("returns default", set_delay, 7000, 5000), + set_builder("keeps set", set_delay, 10000, 7000), + set_builder("returns default", set_prune_interval, 70000, 60000), + set_builder("keeps set", set_prune_interval, 80000, 70000) + ] + }}. + +exception_test_() -> + {inorder, {foreach, + fun setup_default/0, + fun teardown/1, + [ + exception_builder("exception on zero", set_limit, 0), + exception_builder("exception on negative", set_limit, -12), + exception_builder("exception on zero", set_batch_size, 0), + exception_builder("exception on negative", set_batch_size, -12), + set_builder("no exception on zero", set_delay, 0, 5000), + exception_builder("exception on negative", set_delay, -12), + exception_builder("exception on zero", set_prune_interval, 0), + exception_builder("exception on negative", set_prune_interval, -12) + ] + }}. + +config_test_() -> + {inorder, {setup, + fun setup_config/0, + fun teardown/1, + [ + set_builder("reads config", set_limit, 24, 42), + set_builder("keeps set", set_limit, 6, 24) + ] + }}. + +setup_default() -> + {ok, EventPid} = start_server(couch_event_server), + {ok, CfgPid} = start_server(config), + {ok, KenPid} = start_server(ken_server), + [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}]. + +setup_config() -> + {ok, Pwd} = file:get_cwd(), + Config = filename:join([Pwd, "..", "test", "config.ini"]), + {ok, EventPid} = start_server(couch_event_server), + {ok, CfgPid} = start_server(config, [[Config]]), + {ok, KenPid} = start_server(ken_server), + [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}]. + +teardown(Cfg) -> + ok = stop_server(event_pid, Cfg), + ok = stop_server(cfg_pid, Cfg), + ok = stop_server(ken_pid, Cfg). + +exception_builder(Desc, F, Val) -> + D = atom_to_list(F) ++ " " ++ Desc, + {D, ?_assertException(error, function_clause, ken_server:F(Val))}. + +set_builder(Desc, F, In, Out) -> + D = atom_to_list(F) ++ " " ++ Desc, + {D, ?_assertEqual(Out, ken_server:F(In))}. + +start_server(Module) -> + start_server(Module, []). + +start_server(Module, Config) -> + gen_server:start({local, Module}, Module, Config, []). + +stop_server(Key, Cfg) -> + {Key, Pid} = lists:keyfind(Key, 1, Cfg), + MRef = erlang:monitor(process, Pid), + true = exit(Pid, kill), + receive {'DOWN', MRef, _, _, _} -> ok end. |