summaryrefslogtreecommitdiff
path: root/src/ken/src/ken_server.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ken/src/ken_server.erl')
-rw-r--r--src/ken/src/ken_server.erl565
1 files changed, 0 insertions, 565 deletions
diff --git a/src/ken/src/ken_server.erl b/src/ken/src/ken_server.erl
deleted file mode 100644
index b33d01f35..000000000
--- a/src/ken/src/ken_server.erl
+++ /dev/null
@@ -1,565 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(ken_server).
-
-% gen_server boilerplate
--behaviour(gen_server).
--vsn(1).
--export([init/1, terminate/2]).
--export([handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
-
-% Public interface
--export([start_link/0]).
--export([add/1]).
--export([remove/1]).
--export([add_all_shards/1]).
--export([set_batch_size/1]).
--export([set_delay/1]).
--export([set_limit/1]).
--export([set_prune_interval/1]).
-
-% exports for spawn
--export([update_db_indexes/2]).
-
--record(job, {
- name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
- server, % Pid of either view group or search index
- worker_pid = nil,
- seq = 0,
- lru = erlang:monotonic_time()
-}).
-
--record(state, {
- q = queue:new(),
- dbworker = nil,
- limit = 20,
- delay = 5000,
- batch_size = 1,
- prune_interval = 60000,
- pruned_last
-}).
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("mem3/include/mem3.hrl").
-
--ifdef(HAVE_DREYFUS).
--include_lib("dreyfus/include/dreyfus.hrl").
--endif.
-
--ifdef(HAVE_HASTINGS).
--include_lib("hastings/src/hastings.hrl").
--endif.
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-%% @doc Adds a database shard to be indexed
--spec add(binary()) -> ok.
-add(DbName) ->
- gen_server:cast(?MODULE, {add, DbName}).
-
-%% @doc Removes all the pending jobs for a database shard.
--spec remove(binary()) -> ok.
-remove(DbName) ->
- gen_server:cast(?MODULE, {remove, DbName}).
-
-%% @doc Adds all the shards for a database to be indexed.
--spec add_all_shards(binary()) -> ok.
-add_all_shards(DbName) ->
- try
- Shards = mem3:shards(mem3:dbname(DbName)),
- lists:map(fun(Shard) ->
- rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
- end, Shards)
- catch error:database_does_not_exist ->
- ok
- end.
-
-%% @doc Changes the configured value for a batch size.
-%% Returns previous value.
--spec set_batch_size(pos_integer()) -> pos_integer().
-set_batch_size(BS) when is_integer(BS), BS > 0 ->
- gen_server:call(?MODULE, {set_batch_size, BS}).
-
-%% @doc Changes the configured value for a delay between batches.
-%% Returns previous value.
--spec set_delay(non_neg_integer()) -> non_neg_integer().
-set_delay(Delay) when is_integer(Delay), Delay >= 0 ->
- gen_server:call(?MODULE, {set_delay, Delay}).
-
-%% @doc Changes the configured value for a limit.
-%% Returns previous value.
--spec set_limit(pos_integer()) -> pos_integer().
-set_limit(Limit) when is_integer(Limit), Limit > 0 ->
- gen_server:call(?MODULE, {set_limit, Limit}).
-
-%% @doc Changes the configured value for a prune interval.
-%% Returns previous value.
--spec set_prune_interval(pos_integer()) -> pos_integer().
-set_prune_interval(Interval) when is_integer(Interval), Interval > 1000 ->
- gen_server:call(?MODULE, {set_prune_interval, Interval}).
-
-%% gen_server callbacks
-
-init(_) ->
- erlang:send(self(), start_event_handler),
- ets:new(ken_pending, [named_table]),
- ets:new(ken_resubmit, [named_table]),
- ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
- Limit = list_to_integer(config("limit", "20")),
- {ok, #state{pruned_last = erlang:monotonic_time(), limit = Limit}}.
-
-terminate(_Reason, _State) ->
- ok.
-
-handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) ->
- {reply, Old, State#state{batch_size = BS}, 0};
-
-handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) ->
- {reply, Old, State#state{delay = Delay}, 0};
-
-handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) ->
- {reply, Old, State#state{limit = Limit}, 0};
-
-handle_call({set_prune_interval, Interval}, _From , State) ->
- Old = State#state.prune_interval,
- {reply, Old, State#state{prune_interval = Interval}, 0};
-
-handle_call(Msg, From, State) ->
- {stop, {unknown_call, Msg, From}, State}.
-
-% Queues a DB to (maybe) have indexing jobs spawned.
-handle_cast({add, DbName}, State) ->
- case ets:insert_new(ken_pending, {DbName}) of
- true ->
- {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
- false ->
- {noreply, State, 0}
- end;
-
-handle_cast({remove, DbName}, State) ->
- Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q),
- ets:delete(ken_pending, DbName),
- % Delete search index workers
- ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}),
- % Delete view index workers
- ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}),
- % TODO kill off active jobs for this DB as well
- {noreply, State#state{q = Q2}, 0};
-
-handle_cast({resubmit, DbName}, State) ->
- ets:delete(ken_resubmit, DbName),
- handle_cast({add, DbName}, State);
-
-% st index job names have 3 elements, 3rd being 'hastings'. See job record definition.
-handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) ->
- % hastings_index:await will trigger a hastings index update
- {Pid, _} = erlang:spawn_monitor(hastings_index, await,
- [GPid, Seq]),
- Now = erlang:monotonic_time(),
- ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
- {noreply, State, 0};
-% search index job names have 3 elements. See job record definition.
-handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
- % dreyfus_index:await will trigger a search index update.
- {Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
- [GPid, Seq]),
- Now = erlang:monotonic_time(),
- ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
- {noreply, State, 0};
-handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
- % couch_index:get_state/2 will trigger a view group index update.
- {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
- Now = erlang:monotonic_time(),
- ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
- {noreply, State, 0};
-
-handle_cast(Msg, State) ->
- {stop, {unknown_cast, Msg}, State}.
-
-handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) ->
- couch_log:error("ken_event_handler terminated: ~w", [Reason]),
- erlang:send_after(5000, self(), start_event_handler),
- {ok, State, 0};
-
-handle_info(start_event_handler, State) ->
- case ken_event_handler:start_link() of
- {ok, _Pid} ->
- ok;
- Error ->
- couch_log:error("ken_event_handler init: ~w", [Error]),
- erlang:send_after(5000, self(), start_event_handler)
- end,
- {noreply, State, 0};
-
-handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
- Now = erlang:monotonic_time(),
- Interval = erlang:convert_time_unit(
- State#state.delay, millisecond, native),
- case Now - Last > Interval of
- true ->
- NewState = prune_worker_table(State);
- _ ->
- NewState = State
- end,
- {noreply, maybe_start_next_queued_job(NewState), I};
-
-handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) ->
- maybe_resubmit(Name, Reason),
- {noreply, St#state{dbworker=nil}, 0};
-
-handle_info({'DOWN', _, _, Pid, Reason}, State) ->
- debrief_worker(Pid, Reason, State),
- {noreply, State, 0};
-
-handle_info(Msg, State) ->
- {stop, {unknown_info, Msg}, State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% private functions
-
-maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) ->
- State;
-maybe_start_next_queued_job(#state{q=Q} = State) ->
- IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
- BatchChannels = list_to_integer(config("batch_channels", "20")),
- TotalChannels = IncrementalChannels + BatchChannels,
- case queue:out(Q) of
- {{value, DbName}, Q2} ->
- case skip_job(DbName) of
- true ->
- % job is either being resubmitted or ignored, skip it
- ets:delete(ken_pending, DbName),
- maybe_start_next_queued_job(State#state{q = Q2});
- false ->
- case get_active_count() of A when A < TotalChannels ->
- Args = [DbName, State],
- {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
- ets:delete(ken_pending, DbName),
- State#state{dbworker = {DbName,Pid}, q = Q2};
- _ ->
- State#state{q = queue:in_r(DbName, Q2)}
- end
- end;
- {empty, Q} ->
- State
- end.
-
-skip_job(DbName) ->
- ets:member(ken_resubmit, DbName) orelse ignore_db(DbName).
-
-ignore_db(DbName) ->
- case config:get("ken.ignore", ?b2l(DbName), false) of
- "true" ->
- true;
- _ ->
- false
- end.
-
-get_active_count() ->
- MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}],
- ets:select_count(ken_workers, MatchSpec).
-
-% If any indexing job fails, resubmit requests for all indexes.
-update_db_indexes(Name, State) ->
- {ok, DDocs} = design_docs(Name),
- RandomSorted = lists:sort([{rand:uniform(), D} || D <- DDocs]),
- Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
- JsonDDoc = couch_doc:from_json_obj(DDoc),
- case update_ddoc_indexes(Name, JsonDDoc, State) of
- ok -> Acc;
- _ -> true
- end
- end, false, RandomSorted),
- if Resubmit -> exit(resubmit); true -> ok end.
-
-design_docs(Name) ->
- try
- case fabric:design_docs(mem3:dbname(Name)) of
- {error, {maintenance_mode, _, _Node}} ->
- {ok, []};
- Else ->
- Else
- end
- catch error:database_does_not_exist ->
- {ok, []}
- end.
-
-% Returns an error if any job creation fails.
-update_ddoc_indexes(Name, #doc{}=Doc, State) ->
- {ok, Db} = case couch_db:open_int(Name, []) of
- {ok, _} = Resp -> Resp;
- Else -> exit(Else)
- end,
- Seq = couch_db:get_update_seq(Db),
- couch_db:close(Db),
- ViewUpdated = case should_update(Doc, <<"views">>) of true ->
- try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
- {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
- catch _:_ ->
- ok
- end;
- false ->
- ok
- end,
- SearchUpdated = search_updated(Name, Doc, Seq, State),
- STUpdated = st_updated(Name, Doc, Seq, State),
- case {ViewUpdated, SearchUpdated, STUpdated} of
- {ok, ok, ok} -> ok;
- _ -> resubmit
- end.
-
--ifdef(HAVE_DREYFUS).
-search_updated(Name, Doc, Seq, State) ->
- case should_update(Doc, <<"indexes">>) of true ->
- try dreyfus_index:design_doc_to_indexes(Doc) of
- SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
- catch _:_ ->
- ok
- end;
- false ->
- ok
- end.
--else.
-search_updated(_Name, _Doc, _Seq, _State) ->
- ok.
--endif.
-
--ifdef(HAVE_HASTINGS).
-st_updated(Name, Doc, Seq, State) ->
- case should_update(Doc, <<"st_indexes">>) of true ->
- try
- hastings_index:design_doc_to_indexes(Doc) of
- STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
- catch _:_ ->
- ok
- end;
- false ->
- ok
- end.
--else.
-st_updated(_Name, _Doc, _Seq, _State) ->
- ok.
--endif.
-
-should_update(#doc{body={Props}}, IndexType) ->
- case couch_util:get_value(<<"autoupdate">>, Props) of
- false ->
- false;
- {AUProps} ->
- case couch_util:get_value(IndexType, AUProps) of
- false ->
- false;
- _ ->
- true
- end;
- _ ->
- true
- end.
-
-update_ddoc_views(Name, MRSt, Seq, State) ->
- Language = couch_mrview_index:get(language, MRSt),
- Allowed = lists:member(Language, allowed_languages()),
- Views = couch_mrview_index:get(views, MRSt),
- if Allowed andalso Views =/= [] ->
- {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
- GroupName = couch_mrview_index:get(idx_name, MRSt),
- maybe_start_job({Name, GroupName}, Pid, Seq, State);
- true -> ok end.
-
--ifdef(HAVE_DREYFUS).
-update_ddoc_search_indexes(DbName, Indexes, Seq, State) ->
- if Indexes =/= [] ->
- % Spawn a job for each search index in the ddoc
- lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) ->
- case dreyfus_index_manager:get_index(DbName, Index) of
- {ok, Pid} ->
- case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
- resubmit -> resubmit;
- _ -> Acc
- end;
- _ ->
- % If any job fails, retry the db.
- resubmit
- end end, ok, Indexes);
- true -> ok end.
--endif.
-
--ifdef(HAVE_HASTINGS).
-update_ddoc_st_indexes(DbName, Indexes, Seq, State) ->
- if Indexes =/= [] ->
- % The record name in hastings is #h_idx rather than #index as it is for dreyfus
- % Spawn a job for each spatial index in the ddoc
- lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) ->
- case hastings_index_manager:get_index(DbName, Index) of
- {ok, Pid} ->
- case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
- resubmit -> resubmit;
- _ -> Acc
- end;
- _ ->
- % If any job fails, retry the db.
- resubmit
- end end, ok, Indexes);
- true -> ok end.
--endif.
-
-should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
- Threshold = list_to_integer(config("max_incremental_updates", "1000")),
- IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
- BatchChannels = list_to_integer(config("batch_channels", "20")),
- TotalChannels = IncrementalChannels + BatchChannels,
- A = get_active_count(),
- #state{delay = Delay, batch_size = BS} = State,
- case ets:lookup(ken_workers, Name) of
- [] ->
- if
- A < BatchChannels ->
- true;
- A < TotalChannels ->
- case Name of
- % st_index name has three elements
- {_, _, hastings} ->
- {ok, CurrentSeq} = hastings_index:await(Pid, 0),
- (Seq - CurrentSeq) < Threshold;
- % View name has two elements.
- {_,_} ->
- % Since seq is 0, couch_index:get_state/2 won't
- % spawn an index update.
- {ok, MRSt} = couch_index:get_state(Pid, 0),
- CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
- (Seq - CurrentSeq) < Threshold;
- % Search name has three elements.
- {_,_,_} ->
- {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
- (Seq - CurrentSeq) < Threshold;
- _ -> % Should never happen, but if it does, ignore.
- false
- end;
- true ->
- false
- end;
- [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
- Now = erlang:monotonic_time(),
- DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond),
- if
- A < BatchChannels, (Seq - OldSeq) >= BS ->
- true;
- A < BatchChannels, DeltaT > Delay ->
- true;
- A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
- true;
- true ->
- false
- end;
- _ ->
- false
- end.
-
-maybe_start_job(JobName, IndexPid, Seq, State) ->
- Job = #job{
- name = JobName,
- server = IndexPid,
- seq = Seq
- },
- case should_start_job(Job, State) of
- true ->
- gen_server:cast(?MODULE, {trigger_update, Job});
- false ->
- resubmit
- end.
-
-debrief_worker(Pid, Reason, _State) ->
- case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of
- [#job{name = Name} = Job] ->
- case Name of
- {DbName,_} ->
- maybe_resubmit(DbName, Reason);
- {DbName,_,_} ->
- maybe_resubmit(DbName, Reason)
- end,
- ets:insert(ken_workers, Job#job{worker_pid = nil});
- [] -> % should never happen, but if it does, ignore
- ok
- end.
-
-maybe_resubmit(_DbName, normal) ->
- ok;
-maybe_resubmit(_DbName, {database_does_not_exist, _}) ->
- ok;
-maybe_resubmit(_DbName, {not_found, no_db_file}) ->
- ok;
-maybe_resubmit(DbName, resubmit) ->
- resubmit(60000, DbName);
-maybe_resubmit(DbName, _) ->
- resubmit(5000, DbName).
-
-resubmit(Delay, DbName) ->
- case ets:insert_new(ken_resubmit, {DbName}) of
- true ->
- erlang:send_after(Delay, ?MODULE, {'$gen_cast', {resubmit, DbName}});
- false ->
- ok
- end.
-
-prune_worker_table(State) ->
- % remove all entries older than specified `delay` in milliseconds
- Delay = erlang:convert_time_unit(State#state.delay, millisecond, native),
- C = erlang:monotonic_time() - Delay,
- %% fun(#job{worker_pid=nil, lru=A) when A < C -> true end
- MatchHead = #job{worker_pid=nil, lru='$1', _='_'},
- Guard = {'<', '$1', C},
- ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
- State#state{pruned_last = erlang:monotonic_time()}.
-
-allowed_languages() ->
- Config = couch_proc_manager:get_servers_from_env("COUCHDB_QUERY_SERVER_") ++
- couch_proc_manager:get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_"),
- Allowed = [list_to_binary(string:to_lower(Lang)) || {Lang, _Cmd} <- Config],
- [<<"query">> | Allowed].
-
-config(Key, Default) ->
- config:get("ken", Key, Default).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-
-
-prune_old_entries_test() ->
- {
- setup,
- fun() ->
- ets:new(ken_workers, [named_table, public, {keypos, #job.name}])
- end,
- fun(_) ->
- catch ets:delete(ken_workers)
- end,
- ?_test(begin
- lists:foreach(fun(Idx) ->
- ets:insert(ken_workers, #job{name=Idx}),
- timer:sleep(100)
- end, lists:seq(1, 3)),
- prune_worker_table(#state{delay=250}),
- ?assertEqual(
- [2, 3],
- lists:usort(
- [N || #job{name = N} <- ets:tab2list(ken_workers)])
- ),
- ok
- end)
- }.
-
--endif.