diff options
author | Robert Newson <rnewson@apache.org> | 2019-10-24 21:30:02 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2019-10-28 17:03:34 +0000 |
commit | 55fd2e2ac44f9185a6865dd2e0b64afbb98cce05 (patch) | |
tree | c003cbfca866a5596e5e14f0b583b84363254176 | |
parent | 053d494e698181ae3b0b0698055f5a24e7995172 (diff) | |
download | couchdb-55fd2e2ac44f9185a6865dd2e0b64afbb98cce05.tar.gz |
Include proxy host and port in connection pool key2271-proxy-connection-sharing
Closes #2271
3 files changed, 56 insertions, 27 deletions
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl index f3e4a864d..f31baf41d 100644 --- a/src/couch_replicator/src/couch_replicator_connection.erl +++ b/src/couch_replicator/src/couch_replicator_connection.erl @@ -30,6 +30,7 @@ -export([ acquire/1, + acquire/2, release/1 ]). @@ -53,6 +54,8 @@ worker, host, port, + proxy_host, + proxy_port, mref }). @@ -72,19 +75,28 @@ init([]) -> ibrowse:add_config([{inactivity_timeout, Interval}]), {ok, #state{close_interval=Interval, timer=Timer}}. +acquire(Url) -> + acquire(Url, undefined). -acquire(URL) when is_binary(URL) -> - acquire(binary_to_list(URL)); +acquire(Url, ProxyUrl) when is_binary(Url) -> + acquire(binary_to_list(Url), ProxyUrl); -acquire(URL0) -> - URL = couch_util:url_strip_password(URL0), - case gen_server:call(?MODULE, {acquire, URL}) of +acquire(Url, ProxyUrl) when is_binary(ProxyUrl) -> + acquire(Url, binary_to_list(ProxyUrl)); + +acquire(Url0, ProxyUrl0) -> + Url = couch_util:url_strip_password(Url0), + ProxyUrl = case ProxyUrl0 of + undefined -> undefined; + _ -> couch_util:url_strip_password(ProxyUrl0) + end, + case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of {ok, Worker} -> link(Worker), {ok, Worker}; {error, all_allocated} -> - {ok, Pid} = ibrowse:spawn_link_worker_process(URL), - ok = gen_server:call(?MODULE, {create, URL, Pid}), + {ok, Pid} = ibrowse:spawn_link_worker_process(Url), + ok = gen_server:call(?MODULE, {create, Url, ProxyUrl, Pid}), {ok, Pid}; {error, Reason} -> {error, Reason} @@ -96,11 +108,14 @@ release(Worker) -> gen_server:cast(?MODULE, {release, Worker}). -handle_call({acquire, URL}, From, State) -> +handle_call({acquire, Url, ProxyUrl}, From, State) -> {Pid, _Ref} = From, - case ibrowse_lib:parse_url(URL) of - #url{host=Host, port=Port} -> - Pat = #connection{host=Host, port=Port, mref=undefined, _='_'}, + case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of + {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} -> + Pat = #connection{ + host=Host, port=Port, + proxy_host=ProxyHost, proxy_port=ProxyPort, + mref=undefined, _='_'}, case ets:match_object(?MODULE, Pat, 1) of '$end_of_table' -> {reply, {error, all_allocated}, State}; @@ -111,20 +126,25 @@ handle_call({acquire, URL}, From, State) -> Pid)}), {reply, {ok, Worker#connection.worker}, State} end; - {error, invalid_uri} -> + {{error, invalid_uri}, _} -> + {reply, {error, invalid_uri}, State}; + {_, {error, invalid_uri}} -> {reply, {error, invalid_uri}, State} end; -handle_call({create, URL, Worker}, From, State) -> +handle_call({create, Url, ProxyUrl, Worker}, From, State) -> {Pid, _Ref} = From, - case ibrowse_lib:parse_url(URL) of - #url{host=Host, port=Port} -> + case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of + {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} -> link(Worker), couch_stats:increment_counter([couch_replicator, connection, creates]), true = ets:insert_new( ?MODULE, - #connection{host=Host, port=Port, worker=Worker, + #connection{ + host=Host, port=Port, + proxy_host=ProxyHost, proxy_port=ProxyPort, + worker=Worker, mref=monitor(process, Pid)} ), {reply, ok, State} @@ -239,3 +259,9 @@ handle_config_terminate(_, stop, _) -> handle_config_terminate(_, _, _) -> Pid = whereis(?MODULE), erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). + + +parse_proxy_url(undefined) -> + #url{host=undefined, port=undefined}; +parse_proxy_url(ProxyUrl) -> + ibrowse_lib:parse_url(ProxyUrl). diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index e4cf11606..4dce319dc 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -45,13 +45,9 @@ setup(Db) -> httpc_pool = nil, url = Url, http_connections = MaxConns, - proxy_url = ProxyURL + proxy_url = ProxyUrl } = Db, - HttpcURL = case ProxyURL of - undefined -> Url; - _ when is_list(ProxyURL) -> ProxyURL - end, - {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, + {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, ProxyUrl, [{max_connections, MaxConns}]), case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of {ok, Db1} -> diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl index 33fb61f1f..90234a6a0 100644 --- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl +++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl @@ -15,7 +15,7 @@ -vsn(1). % public API --export([start_link/2, stop/1]). +-export([start_link/2, start_link/3, stop/1]). -export([get_worker/1, release_worker/2, release_worker_sync/2]). % gen_server API @@ -30,6 +30,7 @@ -record(state, { url, + proxy_url, limit, % max # of workers allowed workers = [], waiting = queue:new(), % blocked clients waiting for a worker @@ -38,7 +39,10 @@ start_link(Url, Options) -> - gen_server:start_link(?MODULE, {Url, Options}, []). + start_link(Url, undefined, Options). + +start_link(Url, ProxyUrl, Options) -> + gen_server:start_link(?MODULE, {Url, ProxyUrl, Options}, []). stop(Pool) -> ok = gen_server:call(Pool, stop, infinity). @@ -54,10 +58,11 @@ release_worker(Pool, Worker) -> release_worker_sync(Pool, Worker) -> ok = gen_server:call(Pool, {release_worker_sync, Worker}). -init({Url, Options}) -> +init({Url, ProxyUrl, Options}) -> process_flag(trap_exit, true), State = #state{ url = Url, + proxy_url = ProxyUrl, limit = get_value(max_connections, Options) }, {ok, State}. @@ -68,6 +73,7 @@ handle_call(get_worker, From, State) -> waiting = Waiting, callers = Callers, url = Url, + proxy_url = ProxyUrl, limit = Limit, workers = Workers } = State, @@ -77,7 +83,7 @@ handle_call(get_worker, From, State) -> false -> % If the call to acquire fails, the worker pool will crash with a % badmatch. - {ok, Worker} = couch_replicator_connection:acquire(Url), + {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl), NewState = State#state{ workers = [Worker | Workers], callers = monitor_client(Callers, Worker, From) @@ -97,6 +103,7 @@ handle_cast({release_worker, Worker}, State) -> handle_info({'EXIT', Pid, _Reason}, State) -> #state{ url = Url, + proxy_url = ProxyUrl, workers = Workers, waiting = Waiting, callers = Callers @@ -111,7 +118,7 @@ handle_info({'EXIT', Pid, _Reason}, State) -> {noreply, State#state{workers = Workers2, callers = NewCallers0}}; {{value, From}, Waiting2} -> - {ok, Worker} = couch_replicator_connection:acquire(Url), + {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl), NewCallers1 = monitor_client(NewCallers0, Worker, From), gen_server:reply(From, {ok, Worker}), NewState = State#state{ |