summaryrefslogtreecommitdiff
path: root/src/couch_index/src/couch_index.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_index/src/couch_index.erl')
-rw-r--r--src/couch_index/src/couch_index.erl194
1 files changed, 95 insertions, 99 deletions
diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl
index 83eadc706..a6b62be7c 100644
--- a/src/couch_index/src/couch_index.erl
+++ b/src/couch_index/src/couch_index.erl
@@ -26,48 +26,40 @@
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
-
-include_lib("couch/include/couch_db.hrl").
-
--define(CHECK_INTERVAL, 600000). % 10 minutes
+% 10 minutes
+-define(CHECK_INTERVAL, 600000).
-record(st, {
mod,
idx_state,
updater,
compactor,
- waiters=[],
- committed=true,
- shutdown=false
+ waiters = [],
+ committed = true,
+ shutdown = false
}).
-
start_link({Module0, IdxState0}) ->
[Module, IdxState] = couch_index_plugin:before_open(Module0, IdxState0),
proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).
-
stop(Pid) ->
gen_server:cast(Pid, stop).
-
get_state(Pid, RequestSeq) ->
gen_server:call(Pid, {get_state, RequestSeq}, infinity).
-
get_info(Pid) ->
gen_server:call(Pid, get_info, group_info_timeout_msec()).
-
trigger_update(Pid, UpdateSeq) ->
gen_server:cast(Pid, {trigger_update, UpdateSeq}).
-
compact(Pid) ->
compact(Pid, []).
-
compact(Pid, Options) ->
{ok, CPid} = gen_server:call(Pid, compact),
case lists:member(monitor, Options) of
@@ -75,7 +67,6 @@ compact(Pid, Options) ->
false -> ok
end.
-
get_compactor_pid(Pid) ->
gen_server:call(Pid, get_compactor_pid).
@@ -96,10 +87,10 @@ init({Mod, IdxState}) ->
{ok, UPid} = couch_index_updater:start_link(self(), Mod),
{ok, CPid} = couch_index_compactor:start_link(self(), Mod),
State = #st{
- mod=Mod,
- idx_state=NewIdxState,
- updater=UPid,
- compactor=CPid
+ mod = Mod,
+ idx_state = NewIdxState,
+ updater = UPid,
+ compactor = CPid
},
Args = [
Mod:get(db_name, IdxState),
@@ -113,9 +104,8 @@ init({Mod, IdxState}) ->
proc_lib:init_ack(Other)
end.
-
terminate(Reason0, State) ->
- #st{mod=Mod, idx_state=IdxState}=State,
+ #st{mod = Mod, idx_state = IdxState} = State,
case Reason0 of
{shutdown, ddoc_updated} ->
Mod:shutdown(IdxState),
@@ -136,24 +126,24 @@ terminate(Reason0, State) ->
couch_log:debug("Closing index for db: ~s idx: ~s sig: ~p because ~r", Args),
ok.
-
handle_call({get_state, ReqSeq}, From, State) ->
#st{
- mod=Mod,
- idx_state=IdxState,
- waiters=Waiters
+ mod = Mod,
+ idx_state = IdxState,
+ waiters = Waiters
} = State,
IdxSeq = Mod:get(update_seq, IdxState),
case ReqSeq =< IdxSeq of
true ->
{reply, {ok, IdxState}, State};
- _ -> % View update required
+ % View update required
+ _ ->
couch_index_updater:run(State#st.updater, IdxState),
Waiters2 = [{From, ReqSeq} | Waiters],
- {noreply, State#st{waiters=Waiters2}, infinity}
+ {noreply, State#st{waiters = Waiters2}, infinity}
end;
handle_call(get_info, _From, State) ->
- #st{mod=Mod} = State,
+ #st{mod = Mod} = State,
IdxState = State#st.idx_state,
{ok, Info0} = Mod:get(info, IdxState),
IsUpdating = couch_index_updater:is_running(State#st.updater),
@@ -162,21 +152,23 @@ handle_call(get_info, _From, State) ->
GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
DbName = Mod:get(db_name, IdxState),
CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
- Info = Info0 ++ [
- {updater_running, IsUpdating},
- {compact_running, IsCompacting},
- {waiting_commit, State#st.committed == false},
- {waiting_clients, length(State#st.waiters)},
- {pending_updates, max(CommittedSeq - IdxSeq, 0)}
- ],
+ Info =
+ Info0 ++
+ [
+ {updater_running, IsUpdating},
+ {compact_running, IsCompacting},
+ {waiting_commit, State#st.committed == false},
+ {waiting_clients, length(State#st.waiters)},
+ {pending_updates, max(CommittedSeq - IdxSeq, 0)}
+ ],
{reply, {ok, Info}, State};
handle_call(reset, _From, State) ->
#st{
- mod=Mod,
- idx_state=IdxState
+ mod = Mod,
+ idx_state = IdxState
} = State,
{ok, NewIdxState} = Mod:reset(IdxState),
- {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
+ {reply, {ok, NewIdxState}, State#st{idx_state = NewIdxState}};
handle_call(compact, _From, State) ->
Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state),
{reply, Resp, State};
@@ -184,8 +176,8 @@ handle_call(get_compactor_pid, _From, State) ->
{reply, {ok, State#st.compactor}, State};
handle_call({compacted, NewIdxState}, _From, State) ->
#st{
- mod=Mod,
- idx_state=OldIdxState
+ mod = Mod,
+ idx_state = OldIdxState
} = State,
assert_signature_match(Mod, OldIdxState, NewIdxState),
NewSeq = Mod:get(update_seq, NewIdxState),
@@ -215,8 +207,8 @@ handle_call({compaction_failed, Reason}, _From, State) ->
handle_cast({trigger_update, UpdateSeq}, State) ->
#st{
- mod=Mod,
- idx_state=IdxState
+ mod = Mod,
+ idx_state = IdxState
} = State,
case UpdateSeq =< Mod:get(update_seq, IdxState) of
true ->
@@ -236,8 +228,8 @@ handle_cast({updated, NewIdxState}, State) ->
end;
handle_cast({new_state, NewIdxState}, State) ->
#st{
- mod=Mod,
- idx_state=OldIdxState
+ mod = Mod,
+ idx_state = OldIdxState
} = State,
OldFd = Mod:get(fd, OldIdxState),
NewFd = Mod:get(fd, NewIdxState),
@@ -257,9 +249,9 @@ handle_cast({new_state, NewIdxState}, State) ->
false -> ok
end,
{noreply, State#st{
- idx_state=NewIdxState,
- waiters=Rest,
- committed=false
+ idx_state = NewIdxState,
+ waiters = Rest,
+ committed = false
}};
false ->
Fmt = "Ignoring update from old indexer for db: ~s idx: ~s",
@@ -272,25 +264,26 @@ handle_cast({new_state, NewIdxState}, State) ->
end;
handle_cast({update_error, Error}, State) ->
send_all(State#st.waiters, Error),
- {noreply, State#st{waiters=[]}};
+ {noreply, State#st{waiters = []}};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(delete, State) ->
- #st{mod=Mod, idx_state=IdxState} = State,
+ #st{mod = Mod, idx_state = IdxState} = State,
ok = Mod:delete(IdxState),
{stop, normal, State};
handle_cast({ddoc_updated, DDocResult}, State) ->
#st{mod = Mod, idx_state = IdxState} = State,
- Shutdown = case DDocResult of
- {not_found, deleted} ->
- true;
- {ok, DDoc} ->
- DbName = Mod:get(db_name, IdxState),
- couch_util:with_db(DbName, fun(Db) ->
- {ok, NewIdxState} = Mod:init(Db, DDoc),
- Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
- end)
- end,
+ Shutdown =
+ case DDocResult of
+ {not_found, deleted} ->
+ true;
+ {ok, DDoc} ->
+ DbName = Mod:get(db_name, IdxState),
+ couch_util:with_db(DbName, fun(Db) ->
+ {ok, NewIdxState} = Mod:init(Db, DDoc),
+ Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
+ end)
+ end,
case Shutdown of
true ->
{stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
@@ -319,10 +312,10 @@ handle_cast(ddoc_updated, State) ->
handle_cast(_Mesg, State) ->
{stop, unhandled_cast, State}.
-handle_info(commit, #st{committed=true}=State) ->
+handle_info(commit, #st{committed = true} = State) ->
{noreply, State};
handle_info(commit, State) ->
- #st{mod=Mod, idx_state=IdxState} = State,
+ #st{mod = Mod, idx_state = IdxState} = State,
DbName = Mod:get(db_name, IdxState),
IdxName = Mod:get(idx_name, IdxState),
GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
@@ -332,7 +325,7 @@ handle_info(commit, State) ->
% Commit the updates
ok = Mod:commit(IdxState),
couch_event:notify(DbName, {index_commit, IdxName}),
- {noreply, State#st{committed=true}};
+ {noreply, State#st{committed = true}};
_ ->
% We can't commit the header because the database seq that's
% fully committed to disk is still behind us. If we committed
@@ -366,25 +359,23 @@ handle_info(maybe_close, State) ->
erlang:send_after(?CHECK_INTERVAL, self(), maybe_close),
{noreply, State}
end;
-handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
+handle_info({'DOWN', _, _, _Pid, _}, #st{mod = Mod, idx_state = IdxState} = State) ->
Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
couch_log:debug("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
catch send_all(State#st.waiters, shutdown),
- {stop, normal, State#st{waiters=[]}}.
+ {stop, normal, State#st{waiters = []}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-maybe_restart_updater(#st{waiters=[]}) ->
+maybe_restart_updater(#st{waiters = []}) ->
ok;
-maybe_restart_updater(#st{idx_state=IdxState}=State) ->
+maybe_restart_updater(#st{idx_state = IdxState} = State) ->
couch_index_updater:run(State#st.updater, IdxState).
-
send_all(Waiters, Reply) ->
[gen_server:reply(From, Reply) || {From, _} <- Waiters].
-
send_replies(Waiters, UpdateSeq, IdxState) ->
Pred = fun({_, S}) -> S =< UpdateSeq end,
{ToSend, Remaining} = lists:partition(Pred, Waiters),
@@ -399,9 +390,9 @@ assert_signature_match(Mod, OldIdxState, NewIdxState) ->
commit_compacted(NewIdxState, State) ->
#st{
- mod=Mod,
- idx_state=OldIdxState,
- updater=Updater
+ mod = Mod,
+ idx_state = OldIdxState,
+ updater = Updater
} = State,
{ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
% Restart the indexer if it's running.
@@ -414,9 +405,9 @@ commit_compacted(NewIdxState, State) ->
false -> ok
end,
State#st{
- idx_state=NewIdxState1,
- committed=false
- }.
+ idx_state = NewIdxState1,
+ committed = false
+ }.
is_recompaction_enabled(IdxState, #st{mod = Mod}) ->
DbName = binary_to_list(Mod:get(db_name, IdxState)),
@@ -449,7 +440,6 @@ get_value(Section, Key) ->
commit_delay() ->
config:get_integer("query_server_config", "commit_freq", 5) * 1000.
-
group_info_timeout_msec() ->
Timeout = config:get("query_server_config", "group_info_timeout", "5000"),
case Timeout of
@@ -459,7 +449,6 @@ group_info_timeout_msec() ->
list_to_integer(Milliseconds)
end.
-
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
@@ -468,7 +457,7 @@ get(db_name, _, _) ->
get(idx_name, _, _) ->
<<"idx_name">>;
get(signature, _, _) ->
- <<61,237,157,230,136,93,96,201,204,17,137,186,50,249,44,135>>.
+ <<61, 237, 157, 230, 136, 93, 96, 201, 204, 17, 137, 186, 50, 249, 44, 135>>.
setup_all() ->
Ctx = test_util:start_couch(),
@@ -526,7 +515,7 @@ recompaction_configuration_tests() ->
EnabledCases = [
[undefined, undefined, undefined],
- [undefined, undefined,"enabled"],
+ [undefined, undefined, "enabled"],
[undefined, "enabled", undefined],
[undefined, "disabled", "enabled"],
[undefined, "enabled", "enabled"],
@@ -563,21 +552,26 @@ recompaction_configuration_tests() ->
?assertEqual([], AllCases -- (EnabledCases ++ DisabledCases)),
- [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases]
- ++
- [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases].
+ [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases] ++
+ [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases].
should_call_recompact(Settings, {IdxState, State}) ->
- {test_id(Settings), ?_test(begin
- ?assert(is_recompaction_enabled(IdxState, State)),
- ok
- end)}.
+ {
+ test_id(Settings),
+ ?_test(begin
+ ?assert(is_recompaction_enabled(IdxState, State)),
+ ok
+ end)
+ }.
should_not_call_recompact(Settings, {IdxState, State}) ->
- {test_id(Settings), ?_test(begin
- ?assertNot(is_recompaction_enabled(IdxState, State)),
- ok
- end)}.
+ {
+ test_id(Settings),
+ ?_test(begin
+ ?assertNot(is_recompaction_enabled(IdxState, State)),
+ ok
+ end)
+ }.
to_string(undefined) -> "undefined";
to_string(Value) -> Value.
@@ -586,7 +580,6 @@ test_id(Settings0) ->
Settings1 = [to_string(Value) || Value <- Settings0],
"[ " ++ lists:flatten(string:join(Settings1, " , ")) ++ " ]".
-
get_group_timeout_info_test_() ->
{
foreach,
@@ -598,25 +591,28 @@ get_group_timeout_info_test_() ->
]
}.
-
t_group_timeout_info_integer() ->
- ?_test(begin
- meck:expect(config, get,
+ ?_test(begin
+ meck:expect(
+ config,
+ get,
fun("query_server_config", "group_info_timeout", _) ->
- "5001"
- end),
+ "5001"
+ end
+ ),
?assertEqual(5001, group_info_timeout_msec())
end).
-
t_group_timeout_info_infinity() ->
- ?_test(begin
- meck:expect(config, get,
+ ?_test(begin
+ meck:expect(
+ config,
+ get,
fun("query_server_config", "group_info_timeout", _) ->
"infinity"
- end),
+ end
+ ),
?assertEqual(infinity, group_info_timeout_msec())
end).
-
-endif.