summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbenoitc <benoitc@apache.org>2014-02-08 19:55:40 +0100
committerbenoitc <benoitc@apache.org>2014-02-08 20:00:57 +0100
commitc94569a675741f099b7cb62f87599bbd61e907f9 (patch)
treea6bf01a4ea1ea260a26da4dc4cf2da3b19814b9e
parent2bf88e3ff03cc674f3001ec93240378cada5bde5 (diff)
downloadcouchdb-c94569a675741f099b7cb62f87599bbd61e907f9.tar.gz
couch_index: add background indexing facility
This change add the possibility to trigger a view indexation in background. The indexation can only work in background if at least one process acquired it using the `couch_index_server:acquire_index/3` function. If all the process that acquired it are down or released it using `couch_index_server:release_indexer/3` then the background task is stopped. By default the background indexation will happen every 1s or when 200 docs has been saved in the database. These parameters can be changed using the options `threshold` and `refresh_interval` in the couch_index section. To use it with couch_mrview a new option {refresh, true} has been added to couch_mrview_changes:handle_changes Also the query parameter refresh=true is passsed in t the HTTP changes API.
-rw-r--r--apps/couch_httpd/src/couch_httpd_changes.erl18
-rw-r--r--apps/couch_index/src/couch_index.erl35
-rw-r--r--apps/couch_index/src/couch_index_indexer.erl189
-rw-r--r--apps/couch_index/src/couch_index_server.erl18
-rw-r--r--apps/couch_mrview/src/couch_mrview_changes.erl56
-rw-r--r--apps/couch_mrview/test/10-index-changes.t17
-rw-r--r--share/www/script/test/changes.js19
7 files changed, 330 insertions, 22 deletions
diff --git a/apps/couch_httpd/src/couch_httpd_changes.erl b/apps/couch_httpd/src/couch_httpd_changes.erl
index 82d9fe016..510e20a2e 100644
--- a/apps/couch_httpd/src/couch_httpd_changes.erl
+++ b/apps/couch_httpd/src/couch_httpd_changes.erl
@@ -177,8 +177,11 @@ handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
since = Since,
db_open_options = DbOptions} = ChangesArgs,
+ Refresh = refresh_option(Req),
+
Options0 = [{since, Since},
- {view_options, ViewOptions}],
+ {view_options, ViewOptions},
+ {refresh, Refresh}],
Options = case ResponseType of
"continuous" -> [stream | Options0];
"eventsource" -> [stream | Options0];
@@ -236,9 +239,9 @@ view_changes_cb({{Seq, _Key, DocId}, _VAl},
%% if we achieved the limit, stop here, else continue.
NewLimit = OldLimit + 1,
if Limit > NewLimit ->
- {ok, {<<",\n">>, Db, NewLimit, Callback, Args}};
+ {ok, {<<",\n">>, NewLimit, Db, Callback, Args}};
true ->
- {stop, {<<"">>, Db, NewLimit, Callback, Args}}
+ {stop, {<<"">>, NewLimit, Db, Callback, Args}}
end;
{error, not_found} ->
%% doc not found, continue
@@ -416,6 +419,15 @@ parse_view_options([{K, V} | Rest], Acc) ->
end,
parse_view_options(Rest, Acc1).
+refresh_option({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props),
+ couch_util:get_value(<<"refresh">>, Query, true);
+refresh_option(Req) ->
+ case couch_httpd:qs_value(Req, "refresh", "true") of
+ "false" -> false;
+ _ -> true
+ end.
+
parse_json(V) when is_list(V) ->
?JSON_DECODE(V);
parse_json(V) ->
diff --git a/apps/couch_index/src/couch_index.erl b/apps/couch_index/src/couch_index.erl
index b9ae567d0..01483bb57 100644
--- a/apps/couch_index/src/couch_index.erl
+++ b/apps/couch_index/src/couch_index.erl
@@ -17,6 +17,7 @@
%% API
-export([start_link/1, stop/1, get_state/2, get_info/1]).
-export([compact/1, compact/2, get_compactor_pid/1]).
+-export([acquire_indexer/1, release_indexer/1]).
-export([config_change/3]).
%% gen_server callbacks
@@ -30,6 +31,7 @@
idx_state,
updater,
compactor,
+ indexer=nil,
waiters=[],
commit_delay,
committed=true,
@@ -68,6 +70,16 @@ compact(Pid, Options) ->
get_compactor_pid(Pid) ->
gen_server:call(Pid, get_compactor_pid).
+
+acquire_indexer(Pid) ->
+ {ok, IPid} = gen_server:call(Pid, get_indexer_pid),
+ gen_server:call(IPid, {acquire, self()}).
+
+release_indexer(Pid) ->
+ {ok, IPid} = gen_server:call(Pid, get_indexer_pid),
+ gen_server:call(IPid, {release, self()}).
+
+
config_change("query_server_config", "commit_freq", NewValue) ->
ok = gen_server:cast(?MODULE, {config_update, NewValue}).
@@ -88,6 +100,7 @@ init({Mod, IdxState}) ->
{ok, NewIdxState} ->
{ok, UPid} = couch_index_updater:start_link(self(), Mod),
{ok, CPid} = couch_index_compactor:start_link(self(), Mod),
+
Delay = couch_config:get("query_server_config", "commit_freq", "5"),
MsDelay = 1000 * list_to_integer(Delay),
State = #st{
@@ -196,7 +209,18 @@ handle_call({compacted, NewIdxState}, _From, State) ->
}};
_ ->
{reply, recompact, State}
- end.
+ end;
+handle_call(get_indexer_pid, _From, #st{mod=Mod, idx_state=IdxState}=State) ->
+ Pid = case State#st.indexer of
+ Pid1 when is_pid(Pid1) ->
+ Pid1;
+ _ ->
+ DbName = Mod:get(db_name, IdxState),
+ {ok, IPid} = couch_index_indexer:start_link(self(), DbName),
+ erlang:monitor(process, IPid),
+ IPid
+ end,
+ {reply, {ok, Pid}, State#st{indexer=Pid}}.
handle_cast({config_change, NewDelay}, State) ->
@@ -318,6 +342,15 @@ handle_info(commit, State) ->
erlang:send_after(Delay, self(), commit),
{noreply, State}
end;
+
+handle_info({'DOWN', _, _, Pid, _}, #st{mod=Mod, idx_state=IdxState,
+ indexer=Pid}=State) ->
+ Args = [Mod:get(db_name, IdxState),
+ Mod:get(idx_name, IdxState)],
+ ?LOG_INFO("Background indexer shutdown by monitor notice for db: ~s idx: ~s", Args),
+
+ {noreply, State#st{indexer=nil}};
+
handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
DbName = Mod:get(db_name, IdxState),
DDocId = Mod:get(idx_name, IdxState),
diff --git a/apps/couch_index/src/couch_index_indexer.erl b/apps/couch_index/src/couch_index_indexer.erl
new file mode 100644
index 000000000..4af85cfd5
--- /dev/null
+++ b/apps/couch_index/src/couch_index_indexer.erl
@@ -0,0 +1,189 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_indexer).
+
+-export([start_link/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {index,
+ dbname,
+ db_updates=0,
+ tref=nil,
+ notifier=nil,
+ locks}).
+
+
+start_link(Index, DbName) ->
+ gen_server:start_link(?MODULE, {Index, DbName}, []).
+
+init({Index, DbName}) ->
+ process_flag(trap_exit, true),
+ %% delay background index indexing
+ self() ! start_indexing,
+ {ok, #state{index=Index,
+ dbname=DbName,
+ locks=dict:new()}}.
+
+handle_call({acquire, Pid}, _From, #state{locks=Locks}=State) ->
+ NLocks = case dict:find(Pid, Locks) of
+ error ->
+ dict:store(Pid, {erlang:monitor(process, Pid), 1}, Locks);
+ {ok, {MRef, Refc}} ->
+ dict:store(Pid, {MRef, Refc+1}, Locks)
+ end,
+ {reply, ok, State#state{locks=NLocks}};
+
+handle_call({release, Pid}, _From, #state{locks=Locks}=State) ->
+ NLocks = case dict:find(Pid, Locks) of
+ {ok, {MRef, 1}} ->
+ erlang:demonitor(MRef, [flush]),
+ dict:erase(Pid, Locks);
+ {ok, {MRef, Refc}} ->
+ dict:store(Pid, {MRef, Refc-1}, Locks);
+ error ->
+ Locks
+ end,
+
+ NState = State#state{locks=NLocks},
+
+ case should_close() of
+ true -> {stop, normal, ok, NState};
+ false -> {reply, ok, NState}
+ end;
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast(updated, #state{index=Index, dbname=DbName,
+ db_updates=Updates}=State) ->
+ Threshold = get_db_threshold(),
+ NUpdates = Updates + 1,
+
+ %% we only update if the number of updates is greater than the
+ %% threshold.
+ case NUpdates =:= Threshold of
+ true ->
+ refresh_index(DbName, Index),
+ {noreply, State#state{db_updates=0}};
+ false ->
+ {noreply, State#state{db_updates=NUpdates}}
+
+ end;
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(start_indexing, #state{dbname=DbName}=State) ->
+ %% start the db notifier to watch db update events
+ {ok, NotifierPid} = start_db_notifier(DbName),
+
+ %% start the timer
+ R = get_refresh_interval(),
+ TRef = erlang:start_timer(R, self(), refresh_index),
+
+ {noreply, State#state{tref=TRef, notifier=NotifierPid}};
+
+handle_info({timeout, TRef, refresh_index}, #state{index=Index,
+ dbname=DbName,
+ tref=TRef,
+ db_updates=N}=State) ->
+ %% only refresh the index if an update happened
+ case N > 0 of
+ true ->
+ refresh_index(DbName, Index);
+ false ->
+ ok
+ end,
+ {noreply, #state{db_updates=0}=State};
+
+handle_info({'DOWN', MRef, _, Pid, _}, #state{locks=Locks}=State) ->
+ NLocks = case dict:find(Pid, Locks) of
+ {ok, {MRef, _}} ->
+ dict:erase(Pid, Locks);
+ error ->
+ Locks
+ end,
+
+ NState = State#state{locks=NLocks},
+
+ case should_close() of
+ true -> {stop, normal, NState};
+ false -> {noreply, NState}
+ end;
+
+handle_info({'EXIT', Pid, _Reason}, #state{notifier=Pid}=State) ->
+ %% db notifier exited
+ {stop, normal, State#state{notifier=nil}}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, #state{tref=TRef, notifier=Pid}) ->
+ if TRef /= nil ->
+ erlang:cancel_timer(TRef);
+ true -> ok
+ end,
+
+ case is_pid(Pid) of
+ true -> couch_util:shutdown_sync(Pid);
+ _ -> ok
+ end,
+ ok.
+
+%% refresh the index to trigger updates.
+refresh_index(Db, Index) ->
+ UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
+ couch_db:get_update_seq(WDb)
+ end),
+
+ case catch couch_index:get_state(Index, UpdateSeq) of
+ {ok, _} -> ok;
+ Error -> {error, Error}
+ end.
+
+%% if none has acquired us, we could stop the server.
+should_close() ->
+ case process_info(self(), monitors) of
+ {monitors, []} -> true;
+ _ -> false
+ end.
+
+
+%% number of max updates before refreshing the index. We don't
+%% update the index on each db update. Instead we are waiting for a
+%% minimum. If the minimum is not acchieved, the update will happen
+%% in the next interval.
+get_db_threshold() ->
+ list_to_integer(
+ couch_config:get("couch_index", "threshold", "200")
+ ).
+
+%% refresh interval in ms, the interval in which the index will be
+%% updated
+get_refresh_interval() ->
+ list_to_integer(
+ couch_config:get("couch_index", "refresh_interval", "1000")
+ ).
+
+%% db notifier
+start_db_notifier(DbName) ->
+ Self = self(),
+
+ couch_db_update_notifier:start_link(fun
+ ({updated, Name}) when Name =:= DbName ->
+ gen_server:cast(Self, updated);
+ (_) ->
+ ok
+ end).
diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl
index 86791db5d..2c6ebc908 100644
--- a/apps/couch_index/src/couch_index_server.erl
+++ b/apps/couch_index/src/couch_index_server.erl
@@ -14,6 +14,7 @@
-behaviour(gen_server).
-export([start_link/0, get_index/4, get_index/3, get_index/2]).
+-export([acquire_indexer/3, release_indexer/3]).
-export([config_change/2, update_notify/1]).
-export([init/1, terminate/2, code_change/3]).
@@ -67,6 +68,23 @@ get_index(Module, IdxState) ->
gen_server:call(?MODULE, {get_index, Args}, infinity)
end.
+acquire_indexer(Module, DbName, DDoc) ->
+ case get_index(Module, DbName, DDoc) of
+ {ok, Pid} ->
+ couch_index:acquire_indexer(Pid);
+ Error ->
+ Error
+ end.
+
+release_indexer(Module, DbName, DDoc) ->
+ case get_index(Module, DbName, DDoc) of
+ {ok, Pid} ->
+ couch_index:release_indexer(Pid);
+ Error ->
+ Error
+ end.
+
+
init([]) ->
process_flag(trap_exit, true),
diff --git a/apps/couch_mrview/src/couch_mrview_changes.erl b/apps/couch_mrview/src/couch_mrview_changes.erl
index a0e528147..c31624e4d 100644
--- a/apps/couch_mrview/src/couch_mrview_changes.erl
+++ b/apps/couch_mrview/src/couch_mrview_changes.erl
@@ -28,14 +28,16 @@
heartbeat,
timeout_acc=0,
notifier,
- stream}).
+ stream,
+ refresh}).
-type changes_stream() :: true | false | once.
-type changes_options() :: [{stream, changes_stream()} |
{since, integer()} |
{view_options, list()} |
{timeout, integer()} |
- {heartbeat, true | integer()}].
+ {heartbeat, true | integer()} |
+ {refresh, true | false}].
-export_type([changes_stream/0]).
-export_type([changes_options/0]).
@@ -47,6 +49,7 @@ handle_changes(DbName, DDocId, View, Fun, Acc, Options) ->
Since = proplists:get_value(since, Options, 0),
Stream = proplists:get_value(stream, Options, false),
ViewOptions = proplists:get_value(view_options, Options, []),
+ Refresh = proplists:get_value(refresh, Options, false),
State0 = #vst{dbname=DbName,
ddoc=DDocId,
@@ -56,20 +59,25 @@ handle_changes(DbName, DDocId, View, Fun, Acc, Options) ->
callback=Fun,
acc=Acc},
- case view_changes_since(State0) of
- {ok, #vst{since=LastSeq, acc=Acc2}=State} ->
- case Stream of
- true ->
- start_loop(State#vst{stream=true}, Options);
- once when LastSeq =:= Since ->
- start_loop(State#vst{stream=once}, Options);
- _ ->
- Fun(stop, {LastSeq, Acc2})
- end;
- {stop, #vst{since=LastSeq, acc=Acc2}} ->
- Fun(stop, {LastSeq, Acc2});
- Error ->
- Error
+ maybe_acquire_indexer(Refresh, DbName, DDocId),
+ try
+ case view_changes_since(State0) of
+ {ok, #vst{since=LastSeq, acc=Acc2}=State} ->
+ case Stream of
+ true ->
+ start_loop(State#vst{stream=true}, Options);
+ once when LastSeq =:= Since ->
+ start_loop(State#vst{stream=once}, Options);
+ _ ->
+ Fun(stop, {LastSeq, Acc2})
+ end;
+ {stop, #vst{since=LastSeq, acc=Acc2}} ->
+ Fun(stop, {LastSeq, Acc2});
+ Error ->
+ Error
+ end
+ after
+ maybe_release_indexer(Refresh, DbName, DDocId)
end.
start_loop(#vst{dbname=DbName, ddoc=DDocId}=State, Options) ->
@@ -169,3 +177,19 @@ index_update_notifier(DbName, DDocId) ->
ok
end),
NotifierPid.
+
+%% acquire the background indexing task so it can eventually be started
+%% if the process close the background task will be automatically
+%% released.
+maybe_acquire_indexer(false, _, _) ->
+ ok;
+maybe_acquire_indexer(true, DbName, DDocId) ->
+ couch_index_server:acquire_indexer(couch_mrview_index, DbName,
+ DDocId).
+
+%% release the background indexing task so it can eventually be stopped
+maybe_release_indexer(false, _, _) ->
+ ok;
+maybe_release_indexer(true, DbName, DDocId) ->
+ couch_index_server:release_indexer(couch_mrview_index, DbName,
+ DDocId).
diff --git a/apps/couch_mrview/test/10-index-changes.t b/apps/couch_mrview/test/10-index-changes.t
index 627376ff5..f53e9edcf 100644
--- a/apps/couch_mrview/test/10-index-changes.t
+++ b/apps/couch_mrview/test/10-index-changes.t
@@ -15,7 +15,7 @@
% the License.
main(_) ->
- etap:plan(6),
+ etap:plan(8),
case (catch test()) of
ok ->
etap:end_tests();
@@ -35,6 +35,7 @@ test() ->
test_stream_once_timeout(Db),
test_stream_once_heartbeat(Db),
test_stream(Db),
+ test_indexer(Db),
test_util:stop_couch(),
ok.
@@ -173,6 +174,20 @@ test_stream(Db) ->
end.
+test_indexer(Db) ->
+ Result = run_query(Db, [{since, 14}]),
+ Expect = {ok, 15, [{{15,14,<<"14">>},14}]},
+ etap:is(Result, Expect, "refresh index by hand OK."),
+
+ {ok, Db1} = save_doc(Db, 15),
+ timer:sleep(1000),
+ Result1 = run_query(Db, [{since, 14}]),
+ Expect1 = {ok, 16, [{{15,14,<<"14">>},14},
+ {{16,15,<<"15">>},15}]},
+ etap:is(Result1, Expect1, "changes indexed in background OK."),
+ ok.
+
+
save_doc(Db, Id) ->
Doc = couch_mrview_test_util:doc(Id),
{ok, _Rev} = couch_db:update_doc(Db, Doc, []),
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js
index 3419eb68c..2822a9080 100644
--- a/share/www/script/test/changes.js
+++ b/share/www/script/test/changes.js
@@ -295,7 +295,7 @@ couchTests.changes = function(debug) {
},
blah: {
map : 'function(doc) {' +
- ' if (doc._id == "blah") {' +
+ ' if ((doc._id == "blah") || (doc._id == "blah2")) {' +
' emit("test", null);' +
' }' +
'}'
@@ -453,6 +453,23 @@ couchTests.changes = function(debug) {
TEquals(400, req.status, "should return 400 for when use_index=no");
+ T(db.save({"_id":"blah2", "bop" : "plankton"}).ok);
+
+ var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah");
+ var resp = JSON.parse(req.responseText);
+ T(resp.results.length === 2);
+ T(resp.results[0].id === "blah");
+ T(resp.results[1].id === "blah2");
+
+ var req = CouchDB.request("GET", '/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah&key="test"');
+ var resp = JSON.parse(req.responseText);
+ T(resp.results.length === 2);
+ T(resp.results[0].id === "blah");
+ T(resp.results[1].id === "blah2");
+
+
+
+
// test for userCtx
run_on_modified_server(
[{section: "httpd",