diff options
Diffstat (limited to 'src/couch_index/src/couch_index.erl')
-rw-r--r-- | src/couch_index/src/couch_index.erl | 194 |
1 files changed, 95 insertions, 99 deletions
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl index 83eadc706..a6b62be7c 100644 --- a/src/couch_index/src/couch_index.erl +++ b/src/couch_index/src/couch_index.erl @@ -26,48 +26,40 @@ -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). - -include_lib("couch/include/couch_db.hrl"). - --define(CHECK_INTERVAL, 600000). % 10 minutes +% 10 minutes +-define(CHECK_INTERVAL, 600000). -record(st, { mod, idx_state, updater, compactor, - waiters=[], - committed=true, - shutdown=false + waiters = [], + committed = true, + shutdown = false }). - start_link({Module0, IdxState0}) -> [Module, IdxState] = couch_index_plugin:before_open(Module0, IdxState0), proc_lib:start_link(?MODULE, init, [{Module, IdxState}]). - stop(Pid) -> gen_server:cast(Pid, stop). - get_state(Pid, RequestSeq) -> gen_server:call(Pid, {get_state, RequestSeq}, infinity). - get_info(Pid) -> gen_server:call(Pid, get_info, group_info_timeout_msec()). - trigger_update(Pid, UpdateSeq) -> gen_server:cast(Pid, {trigger_update, UpdateSeq}). - compact(Pid) -> compact(Pid, []). - compact(Pid, Options) -> {ok, CPid} = gen_server:call(Pid, compact), case lists:member(monitor, Options) of @@ -75,7 +67,6 @@ compact(Pid, Options) -> false -> ok end. - get_compactor_pid(Pid) -> gen_server:call(Pid, get_compactor_pid). @@ -96,10 +87,10 @@ init({Mod, IdxState}) -> {ok, UPid} = couch_index_updater:start_link(self(), Mod), {ok, CPid} = couch_index_compactor:start_link(self(), Mod), State = #st{ - mod=Mod, - idx_state=NewIdxState, - updater=UPid, - compactor=CPid + mod = Mod, + idx_state = NewIdxState, + updater = UPid, + compactor = CPid }, Args = [ Mod:get(db_name, IdxState), @@ -113,9 +104,8 @@ init({Mod, IdxState}) -> proc_lib:init_ack(Other) end. - terminate(Reason0, State) -> - #st{mod=Mod, idx_state=IdxState}=State, + #st{mod = Mod, idx_state = IdxState} = State, case Reason0 of {shutdown, ddoc_updated} -> Mod:shutdown(IdxState), @@ -136,24 +126,24 @@ terminate(Reason0, State) -> couch_log:debug("Closing index for db: ~s idx: ~s sig: ~p because ~r", Args), ok. - handle_call({get_state, ReqSeq}, From, State) -> #st{ - mod=Mod, - idx_state=IdxState, - waiters=Waiters + mod = Mod, + idx_state = IdxState, + waiters = Waiters } = State, IdxSeq = Mod:get(update_seq, IdxState), case ReqSeq =< IdxSeq of true -> {reply, {ok, IdxState}, State}; - _ -> % View update required + % View update required + _ -> couch_index_updater:run(State#st.updater, IdxState), Waiters2 = [{From, ReqSeq} | Waiters], - {noreply, State#st{waiters=Waiters2}, infinity} + {noreply, State#st{waiters = Waiters2}, infinity} end; handle_call(get_info, _From, State) -> - #st{mod=Mod} = State, + #st{mod = Mod} = State, IdxState = State#st.idx_state, {ok, Info0} = Mod:get(info, IdxState), IsUpdating = couch_index_updater:is_running(State#st.updater), @@ -162,21 +152,23 @@ handle_call(get_info, _From, State) -> GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, DbName = Mod:get(db_name, IdxState), CommittedSeq = couch_util:with_db(DbName, GetCommSeq), - Info = Info0 ++ [ - {updater_running, IsUpdating}, - {compact_running, IsCompacting}, - {waiting_commit, State#st.committed == false}, - {waiting_clients, length(State#st.waiters)}, - {pending_updates, max(CommittedSeq - IdxSeq, 0)} - ], + Info = + Info0 ++ + [ + {updater_running, IsUpdating}, + {compact_running, IsCompacting}, + {waiting_commit, State#st.committed == false}, + {waiting_clients, length(State#st.waiters)}, + {pending_updates, max(CommittedSeq - IdxSeq, 0)} + ], {reply, {ok, Info}, State}; handle_call(reset, _From, State) -> #st{ - mod=Mod, - idx_state=IdxState + mod = Mod, + idx_state = IdxState } = State, {ok, NewIdxState} = Mod:reset(IdxState), - {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}}; + {reply, {ok, NewIdxState}, State#st{idx_state = NewIdxState}}; handle_call(compact, _From, State) -> Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state), {reply, Resp, State}; @@ -184,8 +176,8 @@ handle_call(get_compactor_pid, _From, State) -> {reply, {ok, State#st.compactor}, State}; handle_call({compacted, NewIdxState}, _From, State) -> #st{ - mod=Mod, - idx_state=OldIdxState + mod = Mod, + idx_state = OldIdxState } = State, assert_signature_match(Mod, OldIdxState, NewIdxState), NewSeq = Mod:get(update_seq, NewIdxState), @@ -215,8 +207,8 @@ handle_call({compaction_failed, Reason}, _From, State) -> handle_cast({trigger_update, UpdateSeq}, State) -> #st{ - mod=Mod, - idx_state=IdxState + mod = Mod, + idx_state = IdxState } = State, case UpdateSeq =< Mod:get(update_seq, IdxState) of true -> @@ -236,8 +228,8 @@ handle_cast({updated, NewIdxState}, State) -> end; handle_cast({new_state, NewIdxState}, State) -> #st{ - mod=Mod, - idx_state=OldIdxState + mod = Mod, + idx_state = OldIdxState } = State, OldFd = Mod:get(fd, OldIdxState), NewFd = Mod:get(fd, NewIdxState), @@ -257,9 +249,9 @@ handle_cast({new_state, NewIdxState}, State) -> false -> ok end, {noreply, State#st{ - idx_state=NewIdxState, - waiters=Rest, - committed=false + idx_state = NewIdxState, + waiters = Rest, + committed = false }}; false -> Fmt = "Ignoring update from old indexer for db: ~s idx: ~s", @@ -272,25 +264,26 @@ handle_cast({new_state, NewIdxState}, State) -> end; handle_cast({update_error, Error}, State) -> send_all(State#st.waiters, Error), - {noreply, State#st{waiters=[]}}; + {noreply, State#st{waiters = []}}; handle_cast(stop, State) -> {stop, normal, State}; handle_cast(delete, State) -> - #st{mod=Mod, idx_state=IdxState} = State, + #st{mod = Mod, idx_state = IdxState} = State, ok = Mod:delete(IdxState), {stop, normal, State}; handle_cast({ddoc_updated, DDocResult}, State) -> #st{mod = Mod, idx_state = IdxState} = State, - Shutdown = case DDocResult of - {not_found, deleted} -> - true; - {ok, DDoc} -> - DbName = Mod:get(db_name, IdxState), - couch_util:with_db(DbName, fun(Db) -> - {ok, NewIdxState} = Mod:init(Db, DDoc), - Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) - end) - end, + Shutdown = + case DDocResult of + {not_found, deleted} -> + true; + {ok, DDoc} -> + DbName = Mod:get(db_name, IdxState), + couch_util:with_db(DbName, fun(Db) -> + {ok, NewIdxState} = Mod:init(Db, DDoc), + Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) + end) + end, case Shutdown of true -> {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}}; @@ -319,10 +312,10 @@ handle_cast(ddoc_updated, State) -> handle_cast(_Mesg, State) -> {stop, unhandled_cast, State}. -handle_info(commit, #st{committed=true}=State) -> +handle_info(commit, #st{committed = true} = State) -> {noreply, State}; handle_info(commit, State) -> - #st{mod=Mod, idx_state=IdxState} = State, + #st{mod = Mod, idx_state = IdxState} = State, DbName = Mod:get(db_name, IdxState), IdxName = Mod:get(idx_name, IdxState), GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, @@ -332,7 +325,7 @@ handle_info(commit, State) -> % Commit the updates ok = Mod:commit(IdxState), couch_event:notify(DbName, {index_commit, IdxName}), - {noreply, State#st{committed=true}}; + {noreply, State#st{committed = true}}; _ -> % We can't commit the header because the database seq that's % fully committed to disk is still behind us. If we committed @@ -366,25 +359,23 @@ handle_info(maybe_close, State) -> erlang:send_after(?CHECK_INTERVAL, self(), maybe_close), {noreply, State} end; -handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) -> +handle_info({'DOWN', _, _, _Pid, _}, #st{mod = Mod, idx_state = IdxState} = State) -> Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], couch_log:debug("Index shutdown by monitor notice for db: ~s idx: ~s", Args), catch send_all(State#st.waiters, shutdown), - {stop, normal, State#st{waiters=[]}}. + {stop, normal, State#st{waiters = []}}. code_change(_OldVsn, State, _Extra) -> {ok, State}. -maybe_restart_updater(#st{waiters=[]}) -> +maybe_restart_updater(#st{waiters = []}) -> ok; -maybe_restart_updater(#st{idx_state=IdxState}=State) -> +maybe_restart_updater(#st{idx_state = IdxState} = State) -> couch_index_updater:run(State#st.updater, IdxState). - send_all(Waiters, Reply) -> [gen_server:reply(From, Reply) || {From, _} <- Waiters]. - send_replies(Waiters, UpdateSeq, IdxState) -> Pred = fun({_, S}) -> S =< UpdateSeq end, {ToSend, Remaining} = lists:partition(Pred, Waiters), @@ -399,9 +390,9 @@ assert_signature_match(Mod, OldIdxState, NewIdxState) -> commit_compacted(NewIdxState, State) -> #st{ - mod=Mod, - idx_state=OldIdxState, - updater=Updater + mod = Mod, + idx_state = OldIdxState, + updater = Updater } = State, {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState), % Restart the indexer if it's running. @@ -414,9 +405,9 @@ commit_compacted(NewIdxState, State) -> false -> ok end, State#st{ - idx_state=NewIdxState1, - committed=false - }. + idx_state = NewIdxState1, + committed = false + }. is_recompaction_enabled(IdxState, #st{mod = Mod}) -> DbName = binary_to_list(Mod:get(db_name, IdxState)), @@ -449,7 +440,6 @@ get_value(Section, Key) -> commit_delay() -> config:get_integer("query_server_config", "commit_freq", 5) * 1000. - group_info_timeout_msec() -> Timeout = config:get("query_server_config", "group_info_timeout", "5000"), case Timeout of @@ -459,7 +449,6 @@ group_info_timeout_msec() -> list_to_integer(Milliseconds) end. - -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). @@ -468,7 +457,7 @@ get(db_name, _, _) -> get(idx_name, _, _) -> <<"idx_name">>; get(signature, _, _) -> - <<61,237,157,230,136,93,96,201,204,17,137,186,50,249,44,135>>. + <<61, 237, 157, 230, 136, 93, 96, 201, 204, 17, 137, 186, 50, 249, 44, 135>>. setup_all() -> Ctx = test_util:start_couch(), @@ -526,7 +515,7 @@ recompaction_configuration_tests() -> EnabledCases = [ [undefined, undefined, undefined], - [undefined, undefined,"enabled"], + [undefined, undefined, "enabled"], [undefined, "enabled", undefined], [undefined, "disabled", "enabled"], [undefined, "enabled", "enabled"], @@ -563,21 +552,26 @@ recompaction_configuration_tests() -> ?assertEqual([], AllCases -- (EnabledCases ++ DisabledCases)), - [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases] - ++ - [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases]. + [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases] ++ + [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases]. should_call_recompact(Settings, {IdxState, State}) -> - {test_id(Settings), ?_test(begin - ?assert(is_recompaction_enabled(IdxState, State)), - ok - end)}. + { + test_id(Settings), + ?_test(begin + ?assert(is_recompaction_enabled(IdxState, State)), + ok + end) + }. should_not_call_recompact(Settings, {IdxState, State}) -> - {test_id(Settings), ?_test(begin - ?assertNot(is_recompaction_enabled(IdxState, State)), - ok - end)}. + { + test_id(Settings), + ?_test(begin + ?assertNot(is_recompaction_enabled(IdxState, State)), + ok + end) + }. to_string(undefined) -> "undefined"; to_string(Value) -> Value. @@ -586,7 +580,6 @@ test_id(Settings0) -> Settings1 = [to_string(Value) || Value <- Settings0], "[ " ++ lists:flatten(string:join(Settings1, " , ")) ++ " ]". - get_group_timeout_info_test_() -> { foreach, @@ -598,25 +591,28 @@ get_group_timeout_info_test_() -> ] }. - t_group_timeout_info_integer() -> - ?_test(begin - meck:expect(config, get, + ?_test(begin + meck:expect( + config, + get, fun("query_server_config", "group_info_timeout", _) -> - "5001" - end), + "5001" + end + ), ?assertEqual(5001, group_info_timeout_msec()) end). - t_group_timeout_info_infinity() -> - ?_test(begin - meck:expect(config, get, + ?_test(begin + meck:expect( + config, + get, fun("query_server_config", "group_info_timeout", _) -> "infinity" - end), + end + ), ?assertEqual(infinity, group_info_timeout_msec()) end). - -endif. |