+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% index manager ensures only one process is updating a nouveau index at a time.
+%% calling update_index will block until at least one attempt has been made to
+%% make the index as current as the database at the time update_index was called.
+%% public api
+ update_index/1
+%% gen_server bits
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2
+% config_listener api
+-export([handle_config_change/5, handle_config_terminate/3]).
+-define(BY_DBSIG, nouveau_by_dbsig).
+-define(BY_REF, nouveau_by_ref).
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+update_index(#index{} = Index) ->
+ case nouveau:enabled() of
+ true ->
+ gen_server:call(?MODULE, {update, Index}, infinity);
+ false ->
+ {error, nouveau_not_enabled}
+ end.
+init(_) ->
+ couch_util:set_mqd_off_heap(?MODULE),
+ ets:new(?BY_DBSIG, [set, named_table]),
+ ets:new(?BY_REF, [set, named_table]),
+ couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]),
+ configure_ibrowse(nouveau_util:nouveau_url()),
+ ok = config:listen_for_changes(?MODULE, nil),
+ {ok, nil}.
+handle_call({update, #index{} = Index0}, From, State) ->
+ DbSig = {Index0#index.dbname, Index0#index.sig},
+ case ets:lookup(?BY_DBSIG, DbSig) of
+ [] ->
+ {_IndexerPid, IndexerRef} = spawn_monitor(nouveau_index_updater, update, [Index0]),
+ Queue = queue:in(From, queue:new()),
+ true = ets:insert(?BY_DBSIG, {DbSig, Index0, Queue}),
+ true = ets:insert(?BY_REF, {IndexerRef, DbSig});
+ [{_DbSig, Index1, Queue}] ->
+ ets:insert(?BY_DBSIG, {DbSig, Index1, queue:in(From, Queue)})
+ end,
+ {noreply, State};
+handle_call(_Msg, _From, State) ->
+ {reply, unexpected_msg, State}.
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info({'DOWN', IndexerRef, process, _Pid, Reason}, State) ->
+ case ets:lookup(?BY_REF, IndexerRef) of
+ [] ->
+ % not one of ours, somehow...
+ {noreply, State};
+ [{_, DbSig}] ->
+ true = ets:delete(?BY_REF, IndexerRef),
+ [{_, Index, Queue0}] = ets:lookup(?BY_DBSIG, DbSig),
+ {{value, From}, Queue1} = queue:out(Queue0),
+ case Reason of
+ normal ->
+ gen_server:reply(From, ok);
+ {error, Msg} ->
+ couch_log:error(
+ "~p: db:~s ddoc:~s index:~s failed with: ~p",
+ [
+ mem3:dbname(Index#index.dbname),
+ Index#index.ddoc_id,
+ Msg
+ ]
+ ),
+ gen_server:reply(From, {error, Msg})
+ end,
+ case queue:is_empty(Queue1) of
+ true ->
+ true = ets:delete(?BY_DBSIG, DbSig);
+ false ->
+ {_IndexerPid, NewIndexerRef} = spawn_monitor(nouveau_index_updater, update, [
+ Index
+ ]),
+ true = ets:insert(?BY_DBSIG, {DbSig, Index, Queue1}),
+ true = ets:insert(?BY_REF, {NewIndexerRef, DbSig})
+ end,
+ {noreply, State}
+ end;
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+handle_db_event(DbName, deleted, State) ->
+ couch_log:notice("Deleting indexes for ~s as database was deleted", [DbName]),
+ nouveau_api:delete_path(nouveau_util:index_name(DbName)),
+ {ok, State};
+handle_db_event(_DbName, _Event, State) ->
+ {ok, State}.
+handle_config_change("nouveau", "url", URL, _Persist, State) ->
+ configure_ibrowse(URL),
+ {ok, State};
+handle_config_change(_Section, _Key, _Value, _Persist, State) ->
+ {ok, State}.
+handle_config_terminate(_Server, stop, _State) ->
+ ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+ erlang:send_after(
+ 5000,
+ whereis(?MODULE),
+ restart_config_listener
+ ).
+configure_ibrowse(URL) ->
+ #{host := Host, port := Port} = uri_string:parse(URL),
+ ibrowse:set_max_sessions(
+ Host,
+ Port,
+ config:get_integer("nouveau", "max_sessions", 100)
+ ),
+ ibrowse:set_max_pipeline_size(
+ Host,
+ Port,
+ config:get_integer("nouveau", "max_pipeline_size", 1000)
+ ).