summaryrefslogtreecommitdiff
path: root/src/nouveau/src/nouveau_index_manager.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/nouveau/src/nouveau_index_manager.erl')
-rw-r--r--src/nouveau/src/nouveau_index_manager.erl161
1 files changed, 161 insertions, 0 deletions
diff --git a/src/nouveau/src/nouveau_index_manager.erl b/src/nouveau/src/nouveau_index_manager.erl
new file mode 100644
index 000000000..bfbd74990
--- /dev/null
+++ b/src/nouveau/src/nouveau_index_manager.erl
@@ -0,0 +1,161 @@
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% index manager ensures only one process is updating a nouveau index at a time.
+%% calling update_index will block until at least one attempt has been made to
+%% make the index as current as the database at the time update_index was called.
+
+-module(nouveau_index_manager).
+-behaviour(gen_server).
+-behaviour(config_listener).
+-include("nouveau.hrl").
+
+%% public api
+-export([
+ update_index/1
+]).
+
+%% gen_server bits
+-export([
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2
+]).
+
+% config_listener api
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+-export([handle_db_event/3]).
+
+-define(BY_DBSIG, nouveau_by_dbsig).
+-define(BY_REF, nouveau_by_ref).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+update_index(#index{} = Index) ->
+ case nouveau:enabled() of
+ true ->
+ gen_server:call(?MODULE, {update, Index}, infinity);
+ false ->
+ {error, nouveau_not_enabled}
+ end.
+
+init(_) ->
+ couch_util:set_mqd_off_heap(?MODULE),
+ ets:new(?BY_DBSIG, [set, named_table]),
+ ets:new(?BY_REF, [set, named_table]),
+ couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
+ configure_ibrowse(nouveau_util:nouveau_url()),
+ ok = config:listen_for_changes(?MODULE, nil),
+ {ok, nil}.
+
+handle_call({update, #index{} = Index0}, From, State) ->
+ DbSig = {Index0#index.dbname, Index0#index.sig},
+ case ets:lookup(?BY_DBSIG, DbSig) of
+ [] ->
+ {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, update, [Index0]),
+ Queue = queue:in(From, queue:new()),
+ true = ets:insert(?BY_DBSIG, {DbSig, Index0, Queue}),
+ true = ets:insert(?BY_REF, {IndexerRef, DbSig});
+ [{_DbSig, Index1, Queue}] ->
+ ets:insert(?BY_DBSIG, {DbSig, Index1, queue:in(From, Queue)})
+ end,
+ {noreply, State};
+handle_call(_Msg, _From, State) ->
+ {reply, unexpected_msg, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, State) ->
+ case ets:lookup(?BY_REF, IndexerRef) of
+ [] ->
+ % not one of ours, somehow...
+ {noreply, State};
+ [{_, DbSig}] ->
+ true = ets:delete(?BY_REF, IndexerRef),
+ [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, DbSig),
+ {{value, From}, Queue1} = queue:out(Queue0),
+ case Reason of
+ normal ->
+ gen_server:reply(From, ok);
+ {error, Msg} ->
+ couch_log:error(
+ "~p: db:~s ddoc:~s index:~s failed with: ~p",
+ [
+ ?MODULE,
+ mem3:dbname(Index#index.dbname),
+ Index#index.ddoc_id,
+ Index#index.name,
+ Msg
+ ]
+ ),
+ gen_server:reply(From, {error, Msg})
+ end,
+ case queue:is_empty(Queue1) of
+ true ->
+ true = ets:delete(?BY_DBSIG, DbSig);
+ false ->
+ {_IndexerPid, NewIndexerRef} = spawn_monitor(nouveau_index_updater, update, [
+ Index
+ ]),
+ true = ets:insert(?BY_DBSIG, {DbSig, Index, Queue1}),
+ true = ets:insert(?BY_REF, {NewIndexerRef, DbSig})
+ end,
+ {noreply, State}
+ end;
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+handle_db_event(DbName, deleted, State) ->
+ couch_log:notice("Deleting indexes for ~s as database was deleted", [DbName]),
+ nouveau_api:delete_path(nouveau_util:index_name(DbName)),
+ {ok, State};
+handle_db_event(_DbName, _Event, State) ->
+ {ok, State}.
+
+handle_config_change("nouveau", "url", URL, _Persist, State) ->
+ configure_ibrowse(URL),
+ {ok, State};
+handle_config_change(_Section, _Key, _Value, _Persist, State) ->
+ {ok, State}.
+
+handle_config_terminate(_Server, stop, _State) ->
+ ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+ erlang:send_after(
+ 5000,
+ whereis(?MODULE),
+ restart_config_listener
+ ).
+
+configure_ibrowse(URL) ->
+ #{host := Host, port := Port} = uri_string:parse(URL),
+ ibrowse:set_max_sessions(
+ Host,
+ Port,
+ config:get_integer("nouveau", "max_sessions", 100)
+ ),
+ ibrowse:set_max_pipeline_size(
+ Host,
+ Port,
+ config:get_integer("nouveau", "max_pipeline_size", 1000)
+ ).