diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_index.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_index.erl | 367 |
1 files changed, 0 insertions, 367 deletions
diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl deleted file mode 100644 index 2bf560f37..000000000 --- a/src/dreyfus/src/dreyfus_index.erl +++ /dev/null @@ -1,367 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - - -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- - -%% A dreyfus_index gen_server is linked to its clouseau twin. - --module(dreyfus_index). --behaviour(gen_server). --vsn(1). --include_lib("couch/include/couch_db.hrl"). --include("dreyfus.hrl"). - - -% public api. --export([start_link/2, design_doc_to_index/2, await/2, search/2, info/1, - group1/2, group2/2, - design_doc_to_indexes/1]). - -% gen_server api. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - -% private definitions. --record(state, { - dbname, - index, - updater_pid=nil, - index_pid=nil, - waiting_list=[] -}). - -% exported for callback. --export([search_int/2, group1_int/2, group2_int/2, info_int/1]). - -% public functions. -start_link(DbName, Index) -> - proc_lib:start_link(?MODULE, init, [{DbName, Index}]). - -await(Pid, MinSeq) -> - MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]}, - dreyfus_util:time([index, await], MFA). - -search(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, search_int, [Pid, QueryArgs]}, - dreyfus_util:time([index, search], MFA). - -group1(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, group1_int, [Pid, QueryArgs]}, - dreyfus_util:time([index, group1], MFA). - -group2(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, group2_int, [Pid, QueryArgs]}, - dreyfus_util:time([index, group2], MFA). - -info(Pid0) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, info_int, [Pid]}, - dreyfus_util:time([index, info], MFA). - -%% We either have a dreyfus_index gen_server pid or the remote -%% clouseau pid. -to_index_pid(Pid) -> - case node(Pid) == node() of - true -> gen_server:call(Pid, get_index_pid, infinity); - false -> Pid - end. - -design_doc_to_indexes(#doc{body={Fields}}=Doc) -> - RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}), - case RawIndexes of - {IndexList} when is_list(IndexList) -> - {IndexNames, _} = lists:unzip(IndexList), - lists:flatmap( - fun(IndexName) -> - case (catch design_doc_to_index(Doc, IndexName)) of - {ok, #index{}=Index} -> [Index]; - _ -> [] - end - end, - IndexNames); - _ -> [] - end. - -% gen_server functions. - -init({DbName, Index}) -> - process_flag(trap_exit, true), - case open_index(DbName, Index) of - {ok, Pid, Seq} -> - State=#state{ - dbname=DbName, - index=Index#index{current_seq=Seq, dbname=DbName}, - index_pid=Pid - }, - case couch_db:open_int(DbName, []) of - {ok, Db} -> - try couch_db:monitor(Db) after couch_db:close(Db) end, - dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index), - proc_lib:init_ack({ok, self()}), - gen_server:enter_loop(?MODULE, [], State); - Error -> - proc_lib:init_ack(Error) - end; - Error -> - proc_lib:init_ack(Error) - end. - -handle_call({await, RequestSeq}, From, - #state{ - index=#index{dbname=DbName,name=IdxName,ddoc_id=DDocId,current_seq=Seq}=Index, - index_pid=IndexPid, - updater_pid=nil, - waiting_list=WaitList - }=State) when RequestSeq > Seq -> - DbName2 = mem3:dbname(DbName), - <<"_design/", GroupId/binary>> = DDocId, - NewState = case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of - false -> - UpPid = spawn_link(fun() -> - dreyfus_index_updater:update(IndexPid,Index) - end), - State#state{ - updater_pid=UpPid, - waiting_list=[{From,RequestSeq}|WaitList] - }; - _ -> - couch_log:notice("Index Blocked from Updating - db: ~p," - " ddocid: ~p name: ~p", [DbName, DDocId, IdxName]), - State - end, - {noreply, NewState}; -handle_call({await, RequestSeq}, _From, - #state{index=#index{current_seq=Seq}}=State) when RequestSeq =< Seq -> - {reply, {ok, State#state.index_pid, Seq}, State}; -handle_call({await, RequestSeq}, From, #state{waiting_list=WaitList}=State) -> - {noreply, State#state{ - waiting_list=[{From,RequestSeq}|WaitList] - }}; - -handle_call(get_index_pid, _From, State) -> % upgrade - {reply, State#state.index_pid, State}; - -handle_call({search, QueryArgs0}, _From, State) -> % obsolete - Reply = search_int(State#state.index_pid, QueryArgs0), - {reply, Reply, State}; - -handle_call({group1, QueryArgs0}, _From, State) -> % obsolete - Reply = group1_int(State#state.index_pid, QueryArgs0), - {reply, Reply, State}; - -handle_call({group2, QueryArgs0}, _From, State) -> % obsolete - Reply = group2_int(State#state.index_pid, QueryArgs0), - {reply, Reply, State}; - -handle_call(info, _From, State) -> % obsolete - Reply = info_int(State#state.index_pid), - {reply, Reply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'EXIT', FromPid, {updated, NewSeq}}, - #state{ - index=#index{dbname=DbName,name=IdxName,ddoc_id=DDocId}=Index0, - index_pid=IndexPid, - updater_pid=UpPid, - waiting_list=WaitList - }=State) when UpPid == FromPid -> - Index = Index0#index{current_seq=NewSeq}, - case reply_with_index(IndexPid, Index, WaitList) of - [] -> - {noreply, State#state{index=Index, - updater_pid=nil, - waiting_list=[] - }}; - StillWaiting -> - DbName2 = mem3:dbname(DbName), - <<"_design/", GroupId/binary>> = DDocId, - Pid = case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of - true -> - couch_log:notice("Index Blocked from Updating - db: ~p, ddocid: ~p" - " name: ~p", [DbName, GroupId, IdxName]), - nil; - false -> - spawn_link(fun() -> - dreyfus_index_updater:update(IndexPid, Index) - end) - end, - {noreply, State#state{index=Index, - updater_pid=Pid, - waiting_list=StillWaiting - }} - end; -handle_info({'EXIT', _, {updated, _}}, State) -> - {noreply, State}; -handle_info({'EXIT', FromPid, Reason}, #state{ - index=Index, - index_pid=IndexPid, - waiting_list=WaitList - }=State) when FromPid == IndexPid -> - couch_log:notice( - "index for ~p closed with reason ~p", [index_name(Index), Reason]), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList], - {stop, normal, State}; -handle_info({'EXIT', FromPid, Reason}, #state{ - index=Index, - updater_pid=UpPid, - waiting_list=WaitList - }=State) when FromPid == UpPid -> - couch_log:info("Shutting down index server ~p, updater ~p closing w/ reason ~w", - [index_name(Index), UpPid, Reason]), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList], - {stop, normal, State}; -handle_info({'EXIT', Pid, Reason}, State) -> - % probably dreyfus_index_manager. - couch_log:notice("Unknown pid ~p closed with reason ~p", [Pid, Reason]), - {stop, normal, State}; -handle_info({'DOWN',_,_,Pid,Reason}, #state{ - index=Index, - waiting_list=WaitList - }=State) -> - couch_log:info("Shutting down index server ~p, db ~p closing w/ reason ~w", - [index_name(Index), Pid, Reason]), - [gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList], - {stop, normal, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -% private functions. - -open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) -> - Path = <<DbName/binary,"/",Sig/binary>>, - case clouseau_rpc:open_index(self(), Path, Analyzer) of - {ok, Pid} -> - case clouseau_rpc:get_update_seq(Pid) of - {ok, Seq} -> - {ok, Pid, Seq}; - Error -> - Error - end; - Error -> - Error - end. - -design_doc_to_index(#doc{id=Id,body={Fields}}, IndexName) -> - Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), - {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), - InvalidDDocError = {invalid_design_doc, - <<"index `", IndexName/binary, "` must have parameter `index`">>}, - case lists:keyfind(IndexName, 1, RawIndexes) of - false -> - {error, {not_found, <<IndexName/binary, " not found.">>}}; - {IndexName, {Index}} -> - Analyzer = couch_util:get_value(<<"analyzer">>, Index, <<"standard">>), - case couch_util:get_value(<<"index">>, Index) of - undefined -> - {error, InvalidDDocError}; - Def -> - Sig = ?l2b(couch_util:to_hex(couch_hash:md5_hash( - term_to_binary({Analyzer, Def})))), - {ok, #index{ - analyzer=Analyzer, - ddoc_id=Id, - def=Def, - def_lang=Language, - name=IndexName, - sig=Sig}} - end; - _ -> - {error, InvalidDDocError} - end. - -reply_with_index(IndexPid, Index, WaitList) -> - reply_with_index(IndexPid, Index, WaitList, []). - -reply_with_index(_IndexPid, _Index, [], Acc) -> - Acc; -reply_with_index(IndexPid, #index{current_seq=IndexSeq}=Index, [{Pid, Seq}|Rest], Acc) when Seq =< IndexSeq -> - gen_server:reply(Pid, {ok, IndexPid, IndexSeq}), - reply_with_index(IndexPid, Index, Rest, Acc); -reply_with_index(IndexPid, Index, [{Pid, Seq}|Rest], Acc) -> - reply_with_index(IndexPid, Index, Rest, [{Pid, Seq}|Acc]). - -index_name(#index{dbname=DbName,ddoc_id=DDocId,name=IndexName}) -> - <<DbName/binary, " ", DDocId/binary, " ", IndexName/binary>>. - -args_to_proplist(#index_query_args{} = Args) -> - [ - {'query', Args#index_query_args.q}, - {partition, Args#index_query_args.partition}, - {limit, Args#index_query_args.limit}, - {refresh, Args#index_query_args.stale =:= false}, - {'after', Args#index_query_args.bookmark}, - {sort, Args#index_query_args.sort}, - {include_fields, Args#index_query_args.include_fields}, - {counts, Args#index_query_args.counts}, - {ranges, Args#index_query_args.ranges}, - {drilldown, Args#index_query_args.drilldown}, - {highlight_fields, Args#index_query_args.highlight_fields}, - {highlight_pre_tag, Args#index_query_args.highlight_pre_tag}, - {highlight_post_tag, Args#index_query_args.highlight_post_tag}, - {highlight_number, Args#index_query_args.highlight_number}, - {highlight_size, Args#index_query_args.highlight_size} - ]. - -args_to_proplist2(#index_query_args{} = Args) -> - [ - {'query', Args#index_query_args.q}, - {field, Args#index_query_args.grouping#grouping.by}, - {refresh, Args#index_query_args.stale =:= false}, - {groups, Args#index_query_args.grouping#grouping.groups}, - {group_sort, Args#index_query_args.grouping#grouping.sort}, - {sort, Args#index_query_args.sort}, - {limit, Args#index_query_args.limit}, - {include_fields, Args#index_query_args.include_fields}, - {highlight_fields, Args#index_query_args.highlight_fields}, - {highlight_pre_tag, Args#index_query_args.highlight_pre_tag}, - {highlight_post_tag, Args#index_query_args.highlight_post_tag}, - {highlight_number, Args#index_query_args.highlight_number}, - {highlight_size, Args#index_query_args.highlight_size} - ]. - -search_int(Pid, QueryArgs0) -> - QueryArgs = dreyfus_util:upgrade(QueryArgs0), - Props = args_to_proplist(QueryArgs), - clouseau_rpc:search(Pid, Props). - -group1_int(Pid, QueryArgs0) -> - QueryArgs = dreyfus_util:upgrade(QueryArgs0), - #index_query_args{ - q = Query, - stale = Stale, - grouping = #grouping{ - by = GroupBy, - offset = Offset, - limit = Limit, - sort = Sort - } - } = QueryArgs, - clouseau_rpc:group1(Pid, Query, GroupBy, Stale =:= false, Sort, - Offset, Limit). - -group2_int(Pid, QueryArgs0) -> - QueryArgs = dreyfus_util:upgrade(QueryArgs0), - Props = args_to_proplist2(QueryArgs), - clouseau_rpc:group2(Pid, Props). - -info_int(Pid) -> - clouseau_rpc:info(Pid). |