-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-%% A dreyfus_index gen_server is linked to its clouseau twin.
-% public api.
--export([start_link/2, design_doc_to_index/2, await/2, search/2, info/1,
- group1/2, group2/2,
- design_doc_to_indexes/1]).
-% gen_server api.
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3, format_status/2]).
-% private definitions.
--record(state, {
- dbname,
- index,
- updater_pid=nil,
- index_pid=nil,
- waiting_list=[]
-% exported for callback.
--export([search_int/2, group1_int/2, group2_int/2, info_int/1]).
-% public functions.
-start_link(DbName, Index) ->
- proc_lib:start_link(?MODULE, init, [{DbName, Index}]).
-await(Pid, MinSeq) ->
- MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]},
- dreyfus_util:time([index, await], MFA).
-search(Pid0, QueryArgs) ->
- Pid = to_index_pid(Pid0),
- MFA = {?MODULE, search_int, [Pid, QueryArgs]},
- dreyfus_util:time([index, search], MFA).
-group1(Pid0, QueryArgs) ->
- Pid = to_index_pid(Pid0),
- MFA = {?MODULE, group1_int, [Pid, QueryArgs]},
- dreyfus_util:time([index, group1], MFA).
-group2(Pid0, QueryArgs) ->
- Pid = to_index_pid(Pid0),
- MFA = {?MODULE, group2_int, [Pid, QueryArgs]},
- dreyfus_util:time([index, group2], MFA).
-info(Pid0) ->
- Pid = to_index_pid(Pid0),
- MFA = {?MODULE, info_int, [Pid]},
- dreyfus_util:time([index, info], MFA).
-%% We either have a dreyfus_index gen_server pid or the remote
-%% clouseau pid.
-to_index_pid(Pid) ->
- case node(Pid) == node() of
- true -> gen_server:call(Pid, get_index_pid, infinity);
- false -> Pid
- end.
-design_doc_to_indexes(#doc{body={Fields}}=Doc) ->
- RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}),
- case RawIndexes of
- {IndexList} when is_list(IndexList) ->
- {IndexNames, _} = lists:unzip(IndexList),
- lists:flatmap(
- fun(IndexName) ->
- case (catch design_doc_to_index(Doc, IndexName)) of
- {ok, #index{}=Index} -> [Index];
- _ -> []
- end
- end,
- IndexNames);
- _ -> []
- end.
-% gen_server functions.
-init({DbName, Index}) ->
- process_flag(trap_exit, true),
- case open_index(DbName, Index) of
- {ok, Pid, Seq} ->
- State=#state{
- dbname=DbName,
- index=Index#index{current_seq=Seq, dbname=DbName},
- index_pid=Pid
- },
- case couch_db:open_int(DbName, []) of
- {ok, Db} ->
- try couch_db:monitor(Db) after couch_db:close(Db) end,
- dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index),
- proc_lib:init_ack({ok, self()}),
- gen_server:enter_loop(?MODULE, [], State);
- Error ->
- proc_lib:init_ack(Error)
- end;
- Error ->
- proc_lib:init_ack(Error)
- end.
-handle_call({await, RequestSeq}, From,
- #state{
- index=#index{dbname=DbName,name=IdxName,ddoc_id=DDocId,current_seq=Seq}=Index,
- index_pid=IndexPid,
- updater_pid=nil,
- waiting_list=WaitList
- }=State) when RequestSeq > Seq ->
- DbName2 = mem3:dbname(DbName),
- <<"_design/", GroupId/binary>> = DDocId,
- NewState = case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of
- false ->
- UpPid = spawn_link(fun() ->
- dreyfus_index_updater:update(IndexPid,Index)
- end),
- State#state{
- updater_pid=UpPid,
- waiting_list=[{From,RequestSeq}|WaitList]
- };
- _ ->
- couch_log:notice("Index Blocked from Updating - db: ~p,"
- " ddocid: ~p name: ~p", [DbName, DDocId, IdxName]),
- State
- end,
- {noreply, NewState};
-handle_call({await, RequestSeq}, _From,
- #state{index=#index{current_seq=Seq}}=State) when RequestSeq =< Seq ->
- {reply, {ok, State#state.index_pid, Seq}, State};
-handle_call({await, RequestSeq}, From, #state{waiting_list=WaitList}=State) ->
- {noreply, State#state{
- waiting_list=[{From,RequestSeq}|WaitList]
- }};
-handle_call(get_index_pid, _From, State) -> % upgrade
- {reply, State#state.index_pid, State};
-handle_call({search, QueryArgs0}, _From, State) -> % obsolete
- Reply = search_int(State#state.index_pid, QueryArgs0),
- {reply, Reply, State};
-handle_call({group1, QueryArgs0}, _From, State) -> % obsolete
- Reply = group1_int(State#state.index_pid, QueryArgs0),
- {reply, Reply, State};
-handle_call({group2, QueryArgs0}, _From, State) -> % obsolete
- Reply = group2_int(State#state.index_pid, QueryArgs0),
- {reply, Reply, State};
-handle_call(info, _From, State) -> % obsolete
- Reply = info_int(State#state.index_pid),
- {reply, Reply, State}.
-handle_cast(_Msg, State) ->
- {noreply, State}.
-handle_info({'EXIT', FromPid, {updated, NewSeq}},
- #state{
- index=#index{dbname=DbName,name=IdxName,ddoc_id=DDocId}=Index0,
- index_pid=IndexPid,
- updater_pid=UpPid,
- waiting_list=WaitList
- }=State) when UpPid == FromPid ->
- Index = Index0#index{current_seq=NewSeq},
- case reply_with_index(IndexPid, Index, WaitList) of
- [] ->
- {noreply, State#state{index=Index,
- updater_pid=nil,
- waiting_list=[]
- }};
- StillWaiting ->
- DbName2 = mem3:dbname(DbName),
- <<"_design/", GroupId/binary>> = DDocId,
- Pid = case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of
- true ->
- couch_log:notice("Index Blocked from Updating - db: ~p, ddocid: ~p"
- " name: ~p", [DbName, GroupId, IdxName]),
- nil;
- false ->
- spawn_link(fun() ->
- dreyfus_index_updater:update(IndexPid, Index)
- end)
- end,
- {noreply, State#state{index=Index,
- updater_pid=Pid,
- waiting_list=StillWaiting
- }}
- end;
-handle_info({'EXIT', _, {updated, _}}, State) ->
- {noreply, State};
-handle_info({'EXIT', FromPid, Reason}, #state{
- index=Index,
- index_pid=IndexPid,
- waiting_list=WaitList
- }=State) when FromPid == IndexPid ->
- couch_log:notice(
- "index for ~p closed with reason ~p", [index_name(Index), Reason]),
- [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList],
- {stop, normal, State};
-handle_info({'EXIT', FromPid, Reason}, #state{
- index=Index,
- updater_pid=UpPid,
- waiting_list=WaitList
- }=State) when FromPid == UpPid ->
- couch_log:info("Shutting down index server ~p, updater ~p closing w/ reason ~w",
- [index_name(Index), UpPid, Reason]),
- [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList],
- {stop, normal, State};
-handle_info({'EXIT', Pid, Reason}, State) ->
- % probably dreyfus_index_manager.
- couch_log:notice("Unknown pid ~p closed with reason ~p", [Pid, Reason]),
- {stop, normal, State};
-handle_info({'DOWN',_,_,Pid,Reason}, #state{
- index=Index,
- waiting_list=WaitList
- }=State) ->
- couch_log:info("Shutting down index server ~p, db ~p closing w/ reason ~w",
- [index_name(Index), Pid, Reason]),
- [gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList],
- {stop, normal, State}.
-terminate(_Reason, _State) ->
- ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-format_status(_Opt, [_PDict, #state{index = #index{} = Index} = State]) ->
- #index{
- ddoc_id=Id,
- name=IndexName,
- sig=Sig
- } = Index,
- IndexScrubbed = [{
- {ddoc_id, Id},
- {name, IndexName},
- {sig, Sig}
- }],
- Scrubbed = State#state{
- index = IndexScrubbed,
- waiting_list = {length, length(State#state.waiting_list)}
- },
- ?record_to_keyval(state, Scrubbed);
-format_status(_Opt, [_PDict, #state{} = State]) ->
- Scrubbed = State#state{
- index = nil,
- waiting_list = {length, length(State#state.waiting_list)}
- },
- ?record_to_keyval(state, Scrubbed).
-% private functions.
-open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
- Path = <<DbName/binary,"/",Sig/binary>>,
- case clouseau_rpc:open_index(self(), Path, Analyzer) of
- {ok, Pid} ->
- case clouseau_rpc:get_update_seq(Pid) of
- {ok, Seq} ->
- {ok, Pid, Seq};
- Error ->
- Error
- end;
- Error ->
- Error
- end.
-design_doc_to_index(#doc{id=Id,body={Fields}}, IndexName) ->
- Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
- {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
- InvalidDDocError = {invalid_design_doc,
- <<"index `", IndexName/binary, "` must have parameter `index`">>},
- case lists:keyfind(IndexName, 1, RawIndexes) of
- false ->
- {error, {not_found, <<IndexName/binary, " not found.">>}};
- {IndexName, {Index}} ->
- Analyzer = couch_util:get_value(<<"analyzer">>, Index, <<"standard">>),
- case couch_util:get_value(<<"index">>, Index) of
- undefined ->
- {error, InvalidDDocError};
- Def ->
- Sig = ?l2b(couch_util:to_hex(couch_hash:md5_hash(
- term_to_binary({Analyzer, Def})))),
- {ok, #index{
- analyzer=Analyzer,
- ddoc_id=Id,
- def=Def,
- def_lang=Language,
- name=IndexName,
- sig=Sig}}
- end;
- _ ->
- {error, InvalidDDocError}
- end.
-reply_with_index(IndexPid, Index, WaitList) ->
- reply_with_index(IndexPid, Index, WaitList, []).
-reply_with_index(_IndexPid, _Index, [], Acc) ->
- Acc;
-reply_with_index(IndexPid, #index{current_seq=IndexSeq}=Index, [{Pid, Seq}|Rest], Acc) when Seq =< IndexSeq ->
- gen_server:reply(Pid, {ok, IndexPid, IndexSeq}),
- reply_with_index(IndexPid, Index, Rest, Acc);
-reply_with_index(IndexPid, Index, [{Pid, Seq}|Rest], Acc) ->
- reply_with_index(IndexPid, Index, Rest, [{Pid, Seq}|Acc]).
-index_name(#index{dbname=DbName,ddoc_id=DDocId,name=IndexName}) ->
- <<DbName/binary, " ", DDocId/binary, " ", IndexName/binary>>.
-args_to_proplist(#index_query_args{} = Args) ->
- [
- {'query', Args#index_query_args.q},
- {partition, Args#index_query_args.partition},
- {limit, Args#index_query_args.limit},
- {refresh, Args#index_query_args.stale =:= false},
- {'after', Args#index_query_args.bookmark},
- {sort, Args#index_query_args.sort},
- {include_fields, Args#index_query_args.include_fields},
- {counts, Args#index_query_args.counts},
- {ranges, Args#index_query_args.ranges},
- {drilldown, Args#index_query_args.drilldown},
- {highlight_fields, Args#index_query_args.highlight_fields},
- {highlight_pre_tag, Args#index_query_args.highlight_pre_tag},
- {highlight_post_tag, Args#index_query_args.highlight_post_tag},
- {highlight_number, Args#index_query_args.highlight_number},
- {highlight_size, Args#index_query_args.highlight_size}
- ].
-args_to_proplist2(#index_query_args{} = Args) ->
- [
- {'query', Args#index_query_args.q},
- {field,},
- {refresh, Args#index_query_args.stale =:= false},
- {groups, Args#index_query_args.grouping#grouping.groups},
- {group_sort, Args#index_query_args.grouping#grouping.sort},
- {sort, Args#index_query_args.sort},
- {limit, Args#index_query_args.limit},
- {include_fields, Args#index_query_args.include_fields},
- {highlight_fields, Args#index_query_args.highlight_fields},
- {highlight_pre_tag, Args#index_query_args.highlight_pre_tag},
- {highlight_post_tag, Args#index_query_args.highlight_post_tag},
- {highlight_number, Args#index_query_args.highlight_number},
- {highlight_size, Args#index_query_args.highlight_size}
- ].
-search_int(Pid, QueryArgs0) ->
- QueryArgs = dreyfus_util:upgrade(QueryArgs0),
- Props = args_to_proplist(QueryArgs),
- clouseau_rpc:search(Pid, Props).
-group1_int(Pid, QueryArgs0) ->
- QueryArgs = dreyfus_util:upgrade(QueryArgs0),
- #index_query_args{
- q = Query,
- stale = Stale,
- grouping = #grouping{
- by = GroupBy,
- offset = Offset,
- limit = Limit,
- sort = Sort
- }
- } = QueryArgs,
- clouseau_rpc:group1(Pid, Query, GroupBy, Stale =:= false, Sort,
- Offset, Limit).
-group2_int(Pid, QueryArgs0) ->
- QueryArgs = dreyfus_util:upgrade(QueryArgs0),
- Props = args_to_proplist2(QueryArgs),
- clouseau_rpc:group2(Pid, Props).
-info_int(Pid) ->
- clouseau_rpc:info(Pid).