diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_httpc_pool.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_httpc_pool.erl | 184 |
1 files changed, 0 insertions, 184 deletions
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl deleted file mode 100644 index 90234a6a0..000000000 --- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl +++ /dev/null @@ -1,184 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_httpc_pool). --behaviour(gen_server). --vsn(1). - -% public API --export([start_link/2, start_link/3, stop/1]). --export([get_worker/1, release_worker/2, release_worker_sync/2]). - -% gen_server API --export([init/1, handle_call/3, handle_info/2, handle_cast/2]). --export([code_change/3, terminate/2]). - --include_lib("couch/include/couch_db.hrl"). - --import(couch_util, [ - get_value/2 -]). - --record(state, { - url, - proxy_url, - limit, % max # of workers allowed - workers = [], - waiting = queue:new(), % blocked clients waiting for a worker - callers = [] % clients who've been given a worker -}). - - -start_link(Url, Options) -> - start_link(Url, undefined, Options). - -start_link(Url, ProxyUrl, Options) -> - gen_server:start_link(?MODULE, {Url, ProxyUrl, Options}, []). - -stop(Pool) -> - ok = gen_server:call(Pool, stop, infinity). - - -get_worker(Pool) -> - {ok, _Worker} = gen_server:call(Pool, get_worker, infinity). - - -release_worker(Pool, Worker) -> - ok = gen_server:cast(Pool, {release_worker, Worker}). - -release_worker_sync(Pool, Worker) -> - ok = gen_server:call(Pool, {release_worker_sync, Worker}). - -init({Url, ProxyUrl, Options}) -> - process_flag(trap_exit, true), - State = #state{ - url = Url, - proxy_url = ProxyUrl, - limit = get_value(max_connections, Options) - }, - {ok, State}. - - -handle_call(get_worker, From, State) -> - #state{ - waiting = Waiting, - callers = Callers, - url = Url, - proxy_url = ProxyUrl, - limit = Limit, - workers = Workers - } = State, - case length(Workers) >= Limit of - true -> - {noreply, State#state{waiting = queue:in(From, Waiting)}}; - false -> - % If the call to acquire fails, the worker pool will crash with a - % badmatch. - {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl), - NewState = State#state{ - workers = [Worker | Workers], - callers = monitor_client(Callers, Worker, From) - }, - {reply, {ok, Worker}, NewState} - end; - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}; - -handle_call({release_worker_sync, Worker}, _From, State) -> - {reply, ok, release_worker_internal(Worker, State)}. - -handle_cast({release_worker, Worker}, State) -> - {noreply, release_worker_internal(Worker, State)}. - -handle_info({'EXIT', Pid, _Reason}, State) -> - #state{ - url = Url, - proxy_url = ProxyUrl, - workers = Workers, - waiting = Waiting, - callers = Callers - } = State, - NewCallers0 = demonitor_client(Callers, Pid), - case Workers -- [Pid] of - Workers -> - {noreply, State#state{callers = NewCallers0}}; - Workers2 -> - case queue:out(Waiting) of - {empty, _} -> - {noreply, State#state{workers = Workers2, - callers = NewCallers0}}; - {{value, From}, Waiting2} -> - {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl), - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - NewState = State#state{ - workers = [Worker | Workers2], - waiting = Waiting2, - callers = NewCallers1 - }, - {noreply, NewState} - end - end; - -handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) -> - case lists:keysearch(Ref, 2, Callers) of - {value, {Worker, Ref}} -> - handle_cast({release_worker, Worker}, State); - false -> - {noreply, State} - end. - -code_change(_OldVsn, #state{}=State, _Extra) -> - {ok, State}. - - -terminate(_Reason, _State) -> - ok. - -monitor_client(Callers, Worker, {ClientPid, _}) -> - [{Worker, erlang:monitor(process, ClientPid)} | Callers]. - -demonitor_client(Callers, Worker) -> - case lists:keysearch(Worker, 1, Callers) of - {value, {Worker, MonRef}} -> - erlang:demonitor(MonRef, [flush]), - lists:keydelete(Worker, 1, Callers); - false -> - Callers - end. - -release_worker_internal(Worker, State) -> - #state{waiting = Waiting, callers = Callers} = State, - NewCallers0 = demonitor_client(Callers, Worker), - case is_process_alive(Worker) andalso - lists:member(Worker, State#state.workers) of - true -> - Workers = case queue:out(Waiting) of - {empty, Waiting2} -> - NewCallers1 = NewCallers0, - couch_replicator_connection:release(Worker), - State#state.workers -- [Worker]; - {{value, From}, Waiting2} -> - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - State#state.workers - end, - NewState = State#state{ - workers = Workers, - waiting = Waiting2, - callers = NewCallers1 - }, - NewState; - false -> - State#state{callers = NewCallers0} - end. |