summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2019-10-24 21:30:02 +0100
committerRobert Newson <rnewson@apache.org>2019-10-28 17:03:34 +0000
commit55fd2e2ac44f9185a6865dd2e0b64afbb98cce05 (patch)
treec003cbfca866a5596e5e14f0b583b84363254176
parent053d494e698181ae3b0b0698055f5a24e7995172 (diff)
downloadcouchdb-2271-proxy-connection-sharing.tar.gz
Include proxy host and port in connection pool key2271-proxy-connection-sharing
Closes #2271
-rw-r--r--src/couch_replicator/src/couch_replicator_connection.erl58
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc_pool.erl17
3 files changed, 56 insertions, 27 deletions
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
index f3e4a864d..f31baf41d 100644
--- a/src/couch_replicator/src/couch_replicator_connection.erl
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -30,6 +30,7 @@
-export([
acquire/1,
+ acquire/2,
release/1
]).
@@ -53,6 +54,8 @@
worker,
host,
port,
+ proxy_host,
+ proxy_port,
mref
}).
@@ -72,19 +75,28 @@ init([]) ->
ibrowse:add_config([{inactivity_timeout, Interval}]),
{ok, #state{close_interval=Interval, timer=Timer}}.
+acquire(Url) ->
+ acquire(Url, undefined).
-acquire(URL) when is_binary(URL) ->
- acquire(binary_to_list(URL));
+acquire(Url, ProxyUrl) when is_binary(Url) ->
+ acquire(binary_to_list(Url), ProxyUrl);
-acquire(URL0) ->
- URL = couch_util:url_strip_password(URL0),
- case gen_server:call(?MODULE, {acquire, URL}) of
+acquire(Url, ProxyUrl) when is_binary(ProxyUrl) ->
+ acquire(Url, binary_to_list(ProxyUrl));
+
+acquire(Url0, ProxyUrl0) ->
+ Url = couch_util:url_strip_password(Url0),
+ ProxyUrl = case ProxyUrl0 of
+ undefined -> undefined;
+ _ -> couch_util:url_strip_password(ProxyUrl0)
+ end,
+ case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of
{ok, Worker} ->
link(Worker),
{ok, Worker};
{error, all_allocated} ->
- {ok, Pid} = ibrowse:spawn_link_worker_process(URL),
- ok = gen_server:call(?MODULE, {create, URL, Pid}),
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Url),
+ ok = gen_server:call(?MODULE, {create, Url, ProxyUrl, Pid}),
{ok, Pid};
{error, Reason} ->
{error, Reason}
@@ -96,11 +108,14 @@ release(Worker) ->
gen_server:cast(?MODULE, {release, Worker}).
-handle_call({acquire, URL}, From, State) ->
+handle_call({acquire, Url, ProxyUrl}, From, State) ->
{Pid, _Ref} = From,
- case ibrowse_lib:parse_url(URL) of
- #url{host=Host, port=Port} ->
- Pat = #connection{host=Host, port=Port, mref=undefined, _='_'},
+ case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of
+ {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
+ Pat = #connection{
+ host=Host, port=Port,
+ proxy_host=ProxyHost, proxy_port=ProxyPort,
+ mref=undefined, _='_'},
case ets:match_object(?MODULE, Pat, 1) of
'$end_of_table' ->
{reply, {error, all_allocated}, State};
@@ -111,20 +126,25 @@ handle_call({acquire, URL}, From, State) ->
Pid)}),
{reply, {ok, Worker#connection.worker}, State}
end;
- {error, invalid_uri} ->
+ {{error, invalid_uri}, _} ->
+ {reply, {error, invalid_uri}, State};
+ {_, {error, invalid_uri}} ->
{reply, {error, invalid_uri}, State}
end;
-handle_call({create, URL, Worker}, From, State) ->
+handle_call({create, Url, ProxyUrl, Worker}, From, State) ->
{Pid, _Ref} = From,
- case ibrowse_lib:parse_url(URL) of
- #url{host=Host, port=Port} ->
+ case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of
+ {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
link(Worker),
couch_stats:increment_counter([couch_replicator, connection,
creates]),
true = ets:insert_new(
?MODULE,
- #connection{host=Host, port=Port, worker=Worker,
+ #connection{
+ host=Host, port=Port,
+ proxy_host=ProxyHost, proxy_port=ProxyPort,
+ worker=Worker,
mref=monitor(process, Pid)}
),
{reply, ok, State}
@@ -239,3 +259,9 @@ handle_config_terminate(_, stop, _) ->
handle_config_terminate(_, _, _) ->
Pid = whereis(?MODULE),
erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
+
+
+parse_proxy_url(undefined) ->
+ #url{host=undefined, port=undefined};
+parse_proxy_url(ProxyUrl) ->
+ ibrowse_lib:parse_url(ProxyUrl).
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index e4cf11606..4dce319dc 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -45,13 +45,9 @@ setup(Db) ->
httpc_pool = nil,
url = Url,
http_connections = MaxConns,
- proxy_url = ProxyURL
+ proxy_url = ProxyUrl
} = Db,
- HttpcURL = case ProxyURL of
- undefined -> Url;
- _ when is_list(ProxyURL) -> ProxyURL
- end,
- {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL,
+ {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, ProxyUrl,
[{max_connections, MaxConns}]),
case couch_replicator_auth:initialize(Db#httpdb{httpc_pool = Pid}) of
{ok, Db1} ->
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
index 33fb61f1f..90234a6a0 100644
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -15,7 +15,7 @@
-vsn(1).
% public API
--export([start_link/2, stop/1]).
+-export([start_link/2, start_link/3, stop/1]).
-export([get_worker/1, release_worker/2, release_worker_sync/2]).
% gen_server API
@@ -30,6 +30,7 @@
-record(state, {
url,
+ proxy_url,
limit, % max # of workers allowed
workers = [],
waiting = queue:new(), % blocked clients waiting for a worker
@@ -38,7 +39,10 @@
start_link(Url, Options) ->
- gen_server:start_link(?MODULE, {Url, Options}, []).
+ start_link(Url, undefined, Options).
+
+start_link(Url, ProxyUrl, Options) ->
+ gen_server:start_link(?MODULE, {Url, ProxyUrl, Options}, []).
stop(Pool) ->
ok = gen_server:call(Pool, stop, infinity).
@@ -54,10 +58,11 @@ release_worker(Pool, Worker) ->
release_worker_sync(Pool, Worker) ->
ok = gen_server:call(Pool, {release_worker_sync, Worker}).
-init({Url, Options}) ->
+init({Url, ProxyUrl, Options}) ->
process_flag(trap_exit, true),
State = #state{
url = Url,
+ proxy_url = ProxyUrl,
limit = get_value(max_connections, Options)
},
{ok, State}.
@@ -68,6 +73,7 @@ handle_call(get_worker, From, State) ->
waiting = Waiting,
callers = Callers,
url = Url,
+ proxy_url = ProxyUrl,
limit = Limit,
workers = Workers
} = State,
@@ -77,7 +83,7 @@ handle_call(get_worker, From, State) ->
false ->
% If the call to acquire fails, the worker pool will crash with a
% badmatch.
- {ok, Worker} = couch_replicator_connection:acquire(Url),
+ {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl),
NewState = State#state{
workers = [Worker | Workers],
callers = monitor_client(Callers, Worker, From)
@@ -97,6 +103,7 @@ handle_cast({release_worker, Worker}, State) ->
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{
url = Url,
+ proxy_url = ProxyUrl,
workers = Workers,
waiting = Waiting,
callers = Callers
@@ -111,7 +118,7 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
{noreply, State#state{workers = Workers2,
callers = NewCallers0}};
{{value, From}, Waiting2} ->
- {ok, Worker} = couch_replicator_connection:acquire(Url),
+ {ok, Worker} = couch_replicator_connection:acquire(Url, ProxyUrl),
NewCallers1 = monitor_client(NewCallers0, Worker, From),
gen_server:reply(From, {ok, Worker}),
NewState = State#state{