summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Bastian <benjamin.bastian@gmail.com>2016-08-02 15:01:23 -0700
committerNick Vatamaniuc <vatamane@apache.org>2017-04-28 17:35:50 -0400
commit25054365a0a198d829a7414f8c0c10e0a5ac6651 (patch)
tree645c807d10f68b7b1167520b4cd30ed2a32f4a47
parent4d2969d9a3e4899554a96d6711bf1d571277766a (diff)
downloadcouchdb-25054365a0a198d829a7414f8c0c10e0a5ac6651.tar.gz
Share connections between replications
This commit adds functionality to share connections between replications. This is to solve two problems: - Prior to this commit, each replication would create a pool of connections and hold onto those connections as long as the replication existed. This was wasteful and cause CouchDB to use many unnecessary connections. - When the pool was being terminated, the pool would block while the socket was closed. This would cause the entire replication scheduler to block. By reusing connections, connections are never closed by clients. They are only ever relinquished. This operation is always fast. This commit adds an intermediary process which tracks which connection processes are being used by which client. It monitors clients and connections. If a client or connection crashes, the paired client/connection will be terminated. A client can gracefully relinquish ownership of a connection. If that happens, the connection will be shared with another client. If the connection remains idle for too long, it will be closed. Jira: COUCHDB-3324
-rw-r--r--src/couch_replicator/src/couch_replicator_connection.erl237
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc.erl123
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc_pool.erl79
3 files changed, 354 insertions, 85 deletions
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
new file mode 100644
index 000000000..9c6472360
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -0,0 +1,237 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_connection).
+
+-behavior(gen_server).
+-behavior(config_listener).
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3
+]).
+
+-export([
+ acquire/1,
+ release/1
+]).
+
+-export([
+ handle_config_change/5,
+ handle_config_terminate/3
+]).
+
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-define(DEFAULT_CLOSE_INTERVAL, 90000).
+-define(RELISTEN_DELAY, 5000).
+
+
+-record(state, {
+ close_interval,
+ timer
+}).
+
+-record(connection, {
+ worker,
+ host,
+ port,
+ mref
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ?MODULE = ets:new(?MODULE, [named_table, public,
+ {keypos, #connection.worker}]),
+ ok = config:listen_for_changes(?MODULE, nil),
+ Interval = config:get_integer("replicator", "connection_close_interval",
+ ?DEFAULT_CLOSE_INTERVAL),
+ Timer = erlang:send_after(Interval, self(), close_idle_connections),
+ ibrowse:add_config([{inactivity_timeout, Interval}]),
+ {ok, #state{close_interval=Interval, timer=Timer}}.
+
+
+acquire(URL) when is_binary(URL) ->
+ acquire(binary_to_list(URL));
+
+acquire(URL0) ->
+ URL = couch_util:url_strip_password(URL0),
+ case gen_server:call(?MODULE, {acquire, URL}) of
+ {ok, Worker} ->
+ link(Worker),
+ {ok, Worker};
+ {error, all_allocated} ->
+ {ok, Pid} = ibrowse:spawn_link_worker_process(URL),
+ ok = gen_server:call(?MODULE, {create, URL, Pid}),
+ {ok, Pid};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+
+release(Worker) ->
+ unlink(Worker),
+ gen_server:cast(?MODULE, {release, Worker}).
+
+
+handle_call({acquire, URL}, From, State) ->
+ {Pid, _Ref} = From,
+ case ibrowse_lib:parse_url(URL) of
+ #url{host=Host, port=Port} ->
+ Pat = #connection{host=Host, port=Port, mref=undefined, _='_'},
+ case ets:match_object(?MODULE, Pat, 1) of
+ '$end_of_table' ->
+ {reply, {error, all_allocated}, State};
+ {[Worker], _Cont} ->
+ couch_stats:increment_counter([couch_replicator, connection,
+ acquires]),
+ ets:insert(?MODULE, Worker#connection{mref=monitor(process,
+ Pid)}),
+ {reply, {ok, Worker#connection.worker}, State}
+ end;
+ {error, invalid_uri} ->
+ {reply, {error, invalid_uri}, State}
+ end;
+
+handle_call({create, URL, Worker}, From, State) ->
+ {Pid, _Ref} = From,
+ case ibrowse_lib:parse_url(URL) of
+ #url{host=Host, port=Port} ->
+ link(Worker),
+ couch_stats:increment_counter([couch_replicator, connection,
+ creates]),
+ true = ets:insert_new(
+ ?MODULE,
+ #connection{host=Host, port=Port, worker=Worker,
+ mref=monitor(process, Pid)}
+ ),
+ {reply, ok, State}
+ end.
+
+
+handle_cast({release, WorkerPid}, State) ->
+ couch_stats:increment_counter([couch_replicator, connection, releases]),
+ case ets:lookup(?MODULE, WorkerPid) of
+ [Worker] ->
+ case Worker#connection.mref of
+ MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
+ undefined -> ok
+ end,
+ ets:insert(?MODULE, Worker#connection{mref=undefined});
+ [] ->
+ ok
+ end,
+ {noreply, State};
+
+handle_cast({connection_close_interval, V}, State) ->
+ erlang:cancel_timer(State#state.timer),
+ NewTimer = erlang:send_after(V, self(), close_idle_connections),
+ ibrowse:add_config([{inactivity_timeout, V}]),
+ {noreply, State#state{close_interval=V, timer=NewTimer}}.
+
+
+% owner crashed
+handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
+ couch_stats:increment_counter([couch_replicator, connection,
+ owner_crashes]),
+ ets:match_delete(?MODULE, #connection{mref=Ref, _='_'}),
+ {noreply, State};
+
+% worker crashed
+handle_info({'EXIT', Pid, Reason}, State) ->
+ couch_stats:increment_counter([couch_replicator, connection,
+ worker_crashes]),
+ case ets:lookup(?MODULE, Pid) of
+ [] ->
+ ok;
+ [Worker] ->
+ #connection{host=Host, port=Port} = Worker,
+ maybe_log_worker_death(Host, Port, Reason),
+ case Worker#connection.mref of
+ MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
+ undefined -> ok
+ end,
+ ets:delete(?MODULE, Pid)
+ end,
+ {noreply, State};
+
+handle_info(close_idle_connections, State) ->
+ #state{
+ close_interval=Interval,
+ timer=Timer
+ } = State,
+ Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}),
+ lists:foreach(fun(Conn) ->
+ couch_stats:increment_counter([couch_replicator, connection, closes]),
+ delete_worker(Conn)
+ end, Conns),
+ erlang:cancel_timer(Timer),
+ NewTimer = erlang:send_after(Interval, self(), close_idle_connections),
+ {noreply, State#state{timer=NewTimer}};
+
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+maybe_log_worker_death(_Host, _Port, normal) ->
+ ok;
+
+maybe_log_worker_death(Host, Port, Reason) ->
+ ErrMsg = "Replication connection to: ~p:~p died with reason ~p",
+ couch_log:info(ErrMsg, [Host, Port, Reason]).
+
+
+-spec delete_worker(#connection{}) -> ok.
+delete_worker(Worker) ->
+ ets:delete(?MODULE, Worker#connection.worker),
+ unlink(Worker#connection.worker),
+ spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end),
+ ok.
+
+
+handle_config_change("replicator", "connection_close_interval", V, _, S) ->
+ ok = gen_server:cast(?MODULE, {connection_close_interval,
+ list_to_integer(V)}),
+ {ok, S};
+
+handle_config_change(_, _, _, _, S) ->
+ {ok, S}.
+
+
+handle_config_terminate(_, stop, _) ->
+ ok;
+
+handle_config_terminate(_, _, _) ->
+ Pid = whereis(?MODULE),
+ erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 309a230e0..58fb0e178 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -40,8 +40,18 @@
-define(MAX_DISCARDED_MESSAGES, 16).
-setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
- {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+setup(Db) ->
+ #httpdb{
+ httpc_pool = nil,
+ url = Url,
+ http_connections = MaxConns,
+ proxy_url = ProxyURL
+ } = Db,
+ HttpcURL = case ProxyURL of
+ undefined -> Url;
+ _ when is_list(ProxyURL) -> ProxyURL
+ end,
+ {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]),
{ok, Db#httpdb{httpc_pool = Pid}}.
@@ -98,6 +108,7 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
HttpDb#httpdb.ibrowse_options)
],
+ backoff_before_request(Worker, HttpDb, Params),
Response = ibrowse:send_req_direct(
Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout),
{Worker, Response}.
@@ -139,8 +150,9 @@ process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
case list_to_integer(Code) of
429 ->
- backoff(Worker, HttpDb, Params);
+ backoff(HttpDb, Params);
Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+ backoff_success(HttpDb, Params),
couch_stats:increment_counter([couch_replicator, responses, success]),
EJson = case Body of
<<>> ->
@@ -150,6 +162,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
end,
Callback(Ok, Headers, EJson);
R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+ backoff_success(HttpDb, Params),
do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
Error ->
couch_stats:increment_counter([couch_replicator, responses, failure]),
@@ -165,9 +178,10 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
{ibrowse_async_headers, ReqId, Code, Headers} ->
case list_to_integer(Code) of
429 ->
- backoff(Worker, HttpDb#httpdb{timeout = get_max_back_off()},
- Params);
+ Timeout = couch_replicator_rate_limiter:max_interval(),
+ backoff(HttpDb#httpdb{timeout = Timeout}, Params);
Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) ->
+ backoff_success(HttpDb, Params),
StreamDataFun = fun() ->
stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
end,
@@ -184,6 +198,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
maybe_retry(Err, Worker, HttpDb, Params)
end;
R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+ backoff_success(HttpDb, Params),
do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
Error ->
couch_stats:increment_counter(
@@ -266,37 +281,48 @@ discard_message(ReqId, Worker, Count) ->
end.
-%% For 429 errors, we perform an exponential backoff up to 2.17 hours.
-%% We use Backoff time as a timeout/failure end.
-backoff(Worker, #httpdb{backoff = Backoff} = HttpDb, Params) ->
- MaxBackOff = get_max_back_off(),
- MaxBackOffLog = get_back_off_log_threshold(),
- ok = timer:sleep(random:uniform(Backoff)),
- Backoff2 = round(Backoff*get_back_off_exp()),
- NewBackoff = erlang:min(Backoff2, MaxBackOff),
- NewHttpDb = HttpDb#httpdb{backoff = NewBackoff},
- case Backoff2 of
- W0 when W0 > MaxBackOff ->
- report_error(Worker, HttpDb, Params, {error,
- "Long 429-induced Retry Time Out"});
- W1 when W1 >= MaxBackOffLog -> % Past 8 min, we log retries
- log_retry_error(Params, HttpDb, Backoff2, "429 Retry"),
- throw({retry, NewHttpDb, Params});
- _ ->
- throw({retry, NewHttpDb, Params})
- end.
-
-
maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) ->
report_error(Worker, HttpDb, Params, {error, Error});
-maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
Params) ->
- ok = timer:sleep(Wait),
- log_retry_error(Params, HttpDb, Wait, Error),
- Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
- NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
- throw({retry, NewHttpDb, Params}).
+ case total_error_time_exceeded(HttpDb) of
+ true ->
+ report_error(Worker, HttpDb, Params, {error, Error});
+ false ->
+ ok = timer:sleep(Wait),
+ log_retry_error(Params, HttpDb, Wait, Error),
+ Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+ HttpDb1 = HttpDb#httpdb{retries = Retries - 1, wait = Wait2},
+ HttpDb2 = update_first_error_timestamp(HttpDb1),
+ throw({retry, HttpDb2, Params})
+ end.
+
+
+% When retrying, check to make total time spent retrying a request is below
+% the current scheduler health threshold. The goal is to not exceed the
+% threshold, otherwise the job which keep retrying too long will still be
+% considered healthy.
+total_error_time_exceeded(#httpdb{first_error_timestamp = nil}) ->
+ false;
+
+total_error_time_exceeded(#httpdb{first_error_timestamp = ErrorTimestamp}) ->
+ HealthThresholdSec = couch_replicator_scheduler:health_threshold(),
+ % Theshold value is halved because in the calling code the next step
+ % is a doubling. Not halving here could mean sleeping too long and
+ % exceeding the health threshold.
+ ThresholdUSec = (HealthThresholdSec / 2) * 1000000,
+ timer:now_diff(os:timestamp(), ErrorTimestamp) > ThresholdUSec.
+
+
+% Remember the first time an error occurs. This value is used later to check
+% the total time spend retrying a request. Because retrying is cursive, on
+% successful result #httpdb{} record is reset back to the original value.
+update_first_error_timestamp(#httpdb{first_error_timestamp = nil} = HttpDb) ->
+ HttpDb#httpdb{first_error_timestamp = os:timestamp()};
+
+update_first_error_timestamp(HttpDb) ->
+ HttpDb.
log_retry_error(Params, HttpDb, Wait, Error) ->
@@ -440,11 +466,32 @@ after_redirect(RedirectUrl, HttpDb, Params) ->
Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
{HttpDb#httpdb{url = RedirectUrl}, Params2}.
-get_max_back_off() ->
- config:get_integer("replicator", "max_backoff_wait", 250 * 32768).
-get_back_off_log_threshold() ->
- config:get_integer("replicator", "max_backoff_log_threshold", 512000).
+backoff_key(HttpDb, Params) ->
+ Method = get_value(method, Params, get),
+ Url = HttpDb#httpdb.url,
+ {Url, Method}.
+
+
+backoff(HttpDb, Params) ->
+ Key = backoff_key(HttpDb, Params),
+ couch_replicator_rate_limiter:failure(Key),
+ throw({retry, HttpDb, Params}).
-get_back_off_exp() ->
- config:get_float("replicator", "backoff_exp", 1.5).
+
+backoff_success(HttpDb, Params) ->
+ Key = backoff_key(HttpDb, Params),
+ couch_replicator_rate_limiter:success(Key).
+
+
+backoff_before_request(Worker, HttpDb, Params) ->
+ Key = backoff_key(HttpDb, Params),
+ Limit = couch_replicator_rate_limiter:max_interval(),
+ case couch_replicator_rate_limiter:interval(Key) of
+ Sleep when Sleep >= Limit ->
+ report_error(Worker, HttpDb, Params, max_backoff);
+ Sleep when Sleep >= 1 ->
+ timer:sleep(Sleep);
+ Sleep when Sleep == 0 ->
+ ok
+ end.
diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
index 09e3b2381..33fb61f1f 100644
--- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl
@@ -31,8 +31,7 @@
-record(state, {
url,
limit, % max # of workers allowed
- free = [], % free workers (connections)
- busy = [], % busy workers (connections)
+ workers = [],
waiting = queue:new(), % blocked clients waiting for a worker
callers = [] % clients who've been given a worker
}).
@@ -70,23 +69,17 @@ handle_call(get_worker, From, State) ->
callers = Callers,
url = Url,
limit = Limit,
- busy = Busy,
- free = Free
+ workers = Workers
} = State,
- case length(Busy) >= Limit of
+ case length(Workers) >= Limit of
true ->
{noreply, State#state{waiting = queue:in(From, Waiting)}};
false ->
- case Free of
- [] ->
- {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
- Free2 = Free;
- [Worker | Free2] ->
- ok
- end,
+ % If the call to acquire fails, the worker pool will crash with a
+ % badmatch.
+ {ok, Worker} = couch_replicator_connection:acquire(Url),
NewState = State#state{
- free = Free2,
- busy = [Worker | Busy],
+ workers = [Worker | Workers],
callers = monitor_client(Callers, Worker, From)
},
{reply, {ok, Worker}, NewState}
@@ -104,35 +97,30 @@ handle_cast({release_worker, Worker}, State) ->
handle_info({'EXIT', Pid, _Reason}, State) ->
#state{
url = Url,
- busy = Busy,
- free = Free,
+ workers = Workers,
waiting = Waiting,
callers = Callers
} = State,
NewCallers0 = demonitor_client(Callers, Pid),
- case Free -- [Pid] of
- Free ->
- case Busy -- [Pid] of
- Busy ->
+ case Workers -- [Pid] of
+ Workers ->
{noreply, State#state{callers = NewCallers0}};
- Busy2 ->
+ Workers2 ->
case queue:out(Waiting) of
- {empty, _} ->
- {noreply, State#state{busy = Busy2, callers = NewCallers0}};
- {{value, From}, Waiting2} ->
- {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
- NewCallers1 = monitor_client(NewCallers0, Worker, From),
- gen_server:reply(From, {ok, Worker}),
- NewState = State#state{
- busy = [Worker | Busy2],
- waiting = Waiting2,
- callers = NewCallers1
- },
- {noreply, NewState}
+ {empty, _} ->
+ {noreply, State#state{workers = Workers2,
+ callers = NewCallers0}};
+ {{value, From}, Waiting2} ->
+ {ok, Worker} = couch_replicator_connection:acquire(Url),
+ NewCallers1 = monitor_client(NewCallers0, Worker, From),
+ gen_server:reply(From, {ok, Worker}),
+ NewState = State#state{
+ workers = [Worker | Workers2],
+ waiting = Waiting2,
+ callers = NewCallers1
+ },
+ {noreply, NewState}
end
- end;
- Free2 ->
- {noreply, State#state{free = Free2, callers = NewCallers0}}
end;
handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) ->
@@ -147,9 +135,8 @@ code_change(_OldVsn, #state{}=State, _Extra) ->
{ok, State}.
-terminate(_Reason, State) ->
- lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
- lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+terminate(_Reason, _State) ->
+ ok.
monitor_client(Callers, Worker, {ClientPid, _}) ->
[{Worker, erlang:monitor(process, ClientPid)} | Callers].
@@ -167,22 +154,20 @@ release_worker_internal(Worker, State) ->
#state{waiting = Waiting, callers = Callers} = State,
NewCallers0 = demonitor_client(Callers, Worker),
case is_process_alive(Worker) andalso
- lists:member(Worker, State#state.busy) of
+ lists:member(Worker, State#state.workers) of
true ->
- case queue:out(Waiting) of
+ Workers = case queue:out(Waiting) of
{empty, Waiting2} ->
NewCallers1 = NewCallers0,
- Busy2 = State#state.busy -- [Worker],
- Free2 = [Worker | State#state.free];
+ couch_replicator_connection:release(Worker),
+ State#state.workers -- [Worker];
{{value, From}, Waiting2} ->
NewCallers1 = monitor_client(NewCallers0, Worker, From),
gen_server:reply(From, {ok, Worker}),
- Busy2 = State#state.busy,
- Free2 = State#state.free
+ State#state.workers
end,
NewState = State#state{
- busy = Busy2,
- free = Free2,
+ workers = Workers,
waiting = Waiting2,
callers = NewCallers1
},