diff options
author | Benjamin Bastian <benjamin.bastian@gmail.com> | 2016-08-02 15:01:23 -0700 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2017-04-28 17:35:50 -0400 |
commit | 25054365a0a198d829a7414f8c0c10e0a5ac6651 (patch) | |
tree | 645c807d10f68b7b1167520b4cd30ed2a32f4a47 | |
parent | 4d2969d9a3e4899554a96d6711bf1d571277766a (diff) | |
download | couchdb-25054365a0a198d829a7414f8c0c10e0a5ac6651.tar.gz |
Share connections between replications
This commit adds functionality to share connections between
replications. This is to solve two problems:
- Prior to this commit, each replication would create a pool of
connections and hold onto those connections as long as the replication
existed. This was wasteful and cause CouchDB to use many unnecessary
connections.
- When the pool was being terminated, the pool would block while the
socket was closed. This would cause the entire replication scheduler
to block. By reusing connections, connections are never closed by
clients. They are only ever relinquished. This operation is always
fast.
This commit adds an intermediary process which tracks which connection
processes are being used by which client. It monitors clients and
connections. If a client or connection crashes, the paired
client/connection will be terminated.
A client can gracefully relinquish ownership of a connection. If that
happens, the connection will be shared with another client. If the
connection remains idle for too long, it will be closed.
Jira: COUCHDB-3324
3 files changed, 354 insertions, 85 deletions
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl new file mode 100644 index 000000000..9c6472360 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_connection.erl @@ -0,0 +1,237 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_connection). + +-behavior(gen_server). +-behavior(config_listener). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3 +]). + +-export([ + acquire/1, + release/1 +]). + +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + +-include_lib("ibrowse/include/ibrowse.hrl"). + +-define(DEFAULT_CLOSE_INTERVAL, 90000). +-define(RELISTEN_DELAY, 5000). + + +-record(state, { + close_interval, + timer +}). + +-record(connection, { + worker, + host, + port, + mref +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init([]) -> + process_flag(trap_exit, true), + ?MODULE = ets:new(?MODULE, [named_table, public, + {keypos, #connection.worker}]), + ok = config:listen_for_changes(?MODULE, nil), + Interval = config:get_integer("replicator", "connection_close_interval", + ?DEFAULT_CLOSE_INTERVAL), + Timer = erlang:send_after(Interval, self(), close_idle_connections), + ibrowse:add_config([{inactivity_timeout, Interval}]), + {ok, #state{close_interval=Interval, timer=Timer}}. + + +acquire(URL) when is_binary(URL) -> + acquire(binary_to_list(URL)); + +acquire(URL0) -> + URL = couch_util:url_strip_password(URL0), + case gen_server:call(?MODULE, {acquire, URL}) of + {ok, Worker} -> + link(Worker), + {ok, Worker}; + {error, all_allocated} -> + {ok, Pid} = ibrowse:spawn_link_worker_process(URL), + ok = gen_server:call(?MODULE, {create, URL, Pid}), + {ok, Pid}; + {error, Reason} -> + {error, Reason} + end. + + +release(Worker) -> + unlink(Worker), + gen_server:cast(?MODULE, {release, Worker}). + + +handle_call({acquire, URL}, From, State) -> + {Pid, _Ref} = From, + case ibrowse_lib:parse_url(URL) of + #url{host=Host, port=Port} -> + Pat = #connection{host=Host, port=Port, mref=undefined, _='_'}, + case ets:match_object(?MODULE, Pat, 1) of + '$end_of_table' -> + {reply, {error, all_allocated}, State}; + {[Worker], _Cont} -> + couch_stats:increment_counter([couch_replicator, connection, + acquires]), + ets:insert(?MODULE, Worker#connection{mref=monitor(process, + Pid)}), + {reply, {ok, Worker#connection.worker}, State} + end; + {error, invalid_uri} -> + {reply, {error, invalid_uri}, State} + end; + +handle_call({create, URL, Worker}, From, State) -> + {Pid, _Ref} = From, + case ibrowse_lib:parse_url(URL) of + #url{host=Host, port=Port} -> + link(Worker), + couch_stats:increment_counter([couch_replicator, connection, + creates]), + true = ets:insert_new( + ?MODULE, + #connection{host=Host, port=Port, worker=Worker, + mref=monitor(process, Pid)} + ), + {reply, ok, State} + end. + + +handle_cast({release, WorkerPid}, State) -> + couch_stats:increment_counter([couch_replicator, connection, releases]), + case ets:lookup(?MODULE, WorkerPid) of + [Worker] -> + case Worker#connection.mref of + MRef when is_reference(MRef) -> demonitor(MRef, [flush]); + undefined -> ok + end, + ets:insert(?MODULE, Worker#connection{mref=undefined}); + [] -> + ok + end, + {noreply, State}; + +handle_cast({connection_close_interval, V}, State) -> + erlang:cancel_timer(State#state.timer), + NewTimer = erlang:send_after(V, self(), close_idle_connections), + ibrowse:add_config([{inactivity_timeout, V}]), + {noreply, State#state{close_interval=V, timer=NewTimer}}. + + +% owner crashed +handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) -> + couch_stats:increment_counter([couch_replicator, connection, + owner_crashes]), + ets:match_delete(?MODULE, #connection{mref=Ref, _='_'}), + {noreply, State}; + +% worker crashed +handle_info({'EXIT', Pid, Reason}, State) -> + couch_stats:increment_counter([couch_replicator, connection, + worker_crashes]), + case ets:lookup(?MODULE, Pid) of + [] -> + ok; + [Worker] -> + #connection{host=Host, port=Port} = Worker, + maybe_log_worker_death(Host, Port, Reason), + case Worker#connection.mref of + MRef when is_reference(MRef) -> demonitor(MRef, [flush]); + undefined -> ok + end, + ets:delete(?MODULE, Pid) + end, + {noreply, State}; + +handle_info(close_idle_connections, State) -> + #state{ + close_interval=Interval, + timer=Timer + } = State, + Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}), + lists:foreach(fun(Conn) -> + couch_stats:increment_counter([couch_replicator, connection, closes]), + delete_worker(Conn) + end, Conns), + erlang:cancel_timer(Timer), + NewTimer = erlang:send_after(Interval, self(), close_idle_connections), + {noreply, State#state{timer=NewTimer}}; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(_Reason, _State) -> + ok. + + +maybe_log_worker_death(_Host, _Port, normal) -> + ok; + +maybe_log_worker_death(Host, Port, Reason) -> + ErrMsg = "Replication connection to: ~p:~p died with reason ~p", + couch_log:info(ErrMsg, [Host, Port, Reason]). + + +-spec delete_worker(#connection{}) -> ok. +delete_worker(Worker) -> + ets:delete(?MODULE, Worker#connection.worker), + unlink(Worker#connection.worker), + spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end), + ok. + + +handle_config_change("replicator", "connection_close_interval", V, _, S) -> + ok = gen_server:cast(?MODULE, {connection_close_interval, + list_to_integer(V)}), + {ok, S}; + +handle_config_change(_, _, _, _, S) -> + {ok, S}. + + +handle_config_terminate(_, stop, _) -> + ok; + +handle_config_terminate(_, _, _) -> + Pid = whereis(?MODULE), + erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index 309a230e0..58fb0e178 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -40,8 +40,18 @@ -define(MAX_DISCARDED_MESSAGES, 16). -setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) -> - {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]), +setup(Db) -> + #httpdb{ + httpc_pool = nil, + url = Url, + http_connections = MaxConns, + proxy_url = ProxyURL + } = Db, + HttpcURL = case ProxyURL of + undefined -> Url; + _ when is_list(ProxyURL) -> ProxyURL + end, + {ok, Pid} = couch_replicator_httpc_pool:start_link(HttpcURL, [{max_connections, MaxConns}]), {ok, Db#httpdb{httpc_pool = Pid}}. @@ -98,6 +108,7 @@ send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) -> lists:ukeymerge(1, get_value(ibrowse_options, Params, []), HttpDb#httpdb.ibrowse_options) ], + backoff_before_request(Worker, HttpDb, Params), Response = ibrowse:send_req_direct( Worker, Url, Headers2, Method, Body, IbrowseOptions, Timeout), {Worker, Response}. @@ -139,8 +150,9 @@ process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) -> process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> case list_to_integer(Code) of 429 -> - backoff(Worker, HttpDb, Params); + backoff(HttpDb, Params); Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) -> + backoff_success(HttpDb, Params), couch_stats:increment_counter([couch_replicator, responses, success]), EJson = case Body of <<>> -> @@ -150,6 +162,7 @@ process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) -> end, Callback(Ok, Headers, EJson); R when R =:= 301 ; R =:= 302 ; R =:= 303 -> + backoff_success(HttpDb, Params), do_redirect(Worker, R, Headers, HttpDb, Params, Callback); Error -> couch_stats:increment_counter([couch_replicator, responses, failure]), @@ -165,9 +178,10 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> {ibrowse_async_headers, ReqId, Code, Headers} -> case list_to_integer(Code) of 429 -> - backoff(Worker, HttpDb#httpdb{timeout = get_max_back_off()}, - Params); + Timeout = couch_replicator_rate_limiter:max_interval(), + backoff(HttpDb#httpdb{timeout = Timeout}, Params); Ok when (Ok >= 200 andalso Ok < 300) ; (Ok >= 400 andalso Ok < 500) -> + backoff_success(HttpDb, Params), StreamDataFun = fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Callback) end, @@ -184,6 +198,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> maybe_retry(Err, Worker, HttpDb, Params) end; R when R =:= 301 ; R =:= 302 ; R =:= 303 -> + backoff_success(HttpDb, Params), do_redirect(Worker, R, Headers, HttpDb, Params, Callback); Error -> couch_stats:increment_counter( @@ -266,37 +281,48 @@ discard_message(ReqId, Worker, Count) -> end. -%% For 429 errors, we perform an exponential backoff up to 2.17 hours. -%% We use Backoff time as a timeout/failure end. -backoff(Worker, #httpdb{backoff = Backoff} = HttpDb, Params) -> - MaxBackOff = get_max_back_off(), - MaxBackOffLog = get_back_off_log_threshold(), - ok = timer:sleep(random:uniform(Backoff)), - Backoff2 = round(Backoff*get_back_off_exp()), - NewBackoff = erlang:min(Backoff2, MaxBackOff), - NewHttpDb = HttpDb#httpdb{backoff = NewBackoff}, - case Backoff2 of - W0 when W0 > MaxBackOff -> - report_error(Worker, HttpDb, Params, {error, - "Long 429-induced Retry Time Out"}); - W1 when W1 >= MaxBackOffLog -> % Past 8 min, we log retries - log_retry_error(Params, HttpDb, Backoff2, "429 Retry"), - throw({retry, NewHttpDb, Params}); - _ -> - throw({retry, NewHttpDb, Params}) - end. - - maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params) -> report_error(Worker, HttpDb, Params, {error, Error}); -maybe_retry(Error, _Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb, +maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb, Params) -> - ok = timer:sleep(Wait), - log_retry_error(Params, HttpDb, Wait, Error), - Wait2 = erlang:min(Wait * 2, ?MAX_WAIT), - NewHttpDb = HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, - throw({retry, NewHttpDb, Params}). + case total_error_time_exceeded(HttpDb) of + true -> + report_error(Worker, HttpDb, Params, {error, Error}); + false -> + ok = timer:sleep(Wait), + log_retry_error(Params, HttpDb, Wait, Error), + Wait2 = erlang:min(Wait * 2, ?MAX_WAIT), + HttpDb1 = HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, + HttpDb2 = update_first_error_timestamp(HttpDb1), + throw({retry, HttpDb2, Params}) + end. + + +% When retrying, check to make total time spent retrying a request is below +% the current scheduler health threshold. The goal is to not exceed the +% threshold, otherwise the job which keep retrying too long will still be +% considered healthy. +total_error_time_exceeded(#httpdb{first_error_timestamp = nil}) -> + false; + +total_error_time_exceeded(#httpdb{first_error_timestamp = ErrorTimestamp}) -> + HealthThresholdSec = couch_replicator_scheduler:health_threshold(), + % Theshold value is halved because in the calling code the next step + % is a doubling. Not halving here could mean sleeping too long and + % exceeding the health threshold. + ThresholdUSec = (HealthThresholdSec / 2) * 1000000, + timer:now_diff(os:timestamp(), ErrorTimestamp) > ThresholdUSec. + + +% Remember the first time an error occurs. This value is used later to check +% the total time spend retrying a request. Because retrying is cursive, on +% successful result #httpdb{} record is reset back to the original value. +update_first_error_timestamp(#httpdb{first_error_timestamp = nil} = HttpDb) -> + HttpDb#httpdb{first_error_timestamp = os:timestamp()}; + +update_first_error_timestamp(HttpDb) -> + HttpDb. log_retry_error(Params, HttpDb, Wait, Error) -> @@ -440,11 +466,32 @@ after_redirect(RedirectUrl, HttpDb, Params) -> Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)), {HttpDb#httpdb{url = RedirectUrl}, Params2}. -get_max_back_off() -> - config:get_integer("replicator", "max_backoff_wait", 250 * 32768). -get_back_off_log_threshold() -> - config:get_integer("replicator", "max_backoff_log_threshold", 512000). +backoff_key(HttpDb, Params) -> + Method = get_value(method, Params, get), + Url = HttpDb#httpdb.url, + {Url, Method}. + + +backoff(HttpDb, Params) -> + Key = backoff_key(HttpDb, Params), + couch_replicator_rate_limiter:failure(Key), + throw({retry, HttpDb, Params}). -get_back_off_exp() -> - config:get_float("replicator", "backoff_exp", 1.5). + +backoff_success(HttpDb, Params) -> + Key = backoff_key(HttpDb, Params), + couch_replicator_rate_limiter:success(Key). + + +backoff_before_request(Worker, HttpDb, Params) -> + Key = backoff_key(HttpDb, Params), + Limit = couch_replicator_rate_limiter:max_interval(), + case couch_replicator_rate_limiter:interval(Key) of + Sleep when Sleep >= Limit -> + report_error(Worker, HttpDb, Params, max_backoff); + Sleep when Sleep >= 1 -> + timer:sleep(Sleep); + Sleep when Sleep == 0 -> + ok + end. diff --git a/src/couch_replicator/src/couch_replicator_httpc_pool.erl b/src/couch_replicator/src/couch_replicator_httpc_pool.erl index 09e3b2381..33fb61f1f 100644 --- a/src/couch_replicator/src/couch_replicator_httpc_pool.erl +++ b/src/couch_replicator/src/couch_replicator_httpc_pool.erl @@ -31,8 +31,7 @@ -record(state, { url, limit, % max # of workers allowed - free = [], % free workers (connections) - busy = [], % busy workers (connections) + workers = [], waiting = queue:new(), % blocked clients waiting for a worker callers = [] % clients who've been given a worker }). @@ -70,23 +69,17 @@ handle_call(get_worker, From, State) -> callers = Callers, url = Url, limit = Limit, - busy = Busy, - free = Free + workers = Workers } = State, - case length(Busy) >= Limit of + case length(Workers) >= Limit of true -> {noreply, State#state{waiting = queue:in(From, Waiting)}}; false -> - case Free of - [] -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), - Free2 = Free; - [Worker | Free2] -> - ok - end, + % If the call to acquire fails, the worker pool will crash with a + % badmatch. + {ok, Worker} = couch_replicator_connection:acquire(Url), NewState = State#state{ - free = Free2, - busy = [Worker | Busy], + workers = [Worker | Workers], callers = monitor_client(Callers, Worker, From) }, {reply, {ok, Worker}, NewState} @@ -104,35 +97,30 @@ handle_cast({release_worker, Worker}, State) -> handle_info({'EXIT', Pid, _Reason}, State) -> #state{ url = Url, - busy = Busy, - free = Free, + workers = Workers, waiting = Waiting, callers = Callers } = State, NewCallers0 = demonitor_client(Callers, Pid), - case Free -- [Pid] of - Free -> - case Busy -- [Pid] of - Busy -> + case Workers -- [Pid] of + Workers -> {noreply, State#state{callers = NewCallers0}}; - Busy2 -> + Workers2 -> case queue:out(Waiting) of - {empty, _} -> - {noreply, State#state{busy = Busy2, callers = NewCallers0}}; - {{value, From}, Waiting2} -> - {ok, Worker} = ibrowse:spawn_link_worker_process(Url), - NewCallers1 = monitor_client(NewCallers0, Worker, From), - gen_server:reply(From, {ok, Worker}), - NewState = State#state{ - busy = [Worker | Busy2], - waiting = Waiting2, - callers = NewCallers1 - }, - {noreply, NewState} + {empty, _} -> + {noreply, State#state{workers = Workers2, + callers = NewCallers0}}; + {{value, From}, Waiting2} -> + {ok, Worker} = couch_replicator_connection:acquire(Url), + NewCallers1 = monitor_client(NewCallers0, Worker, From), + gen_server:reply(From, {ok, Worker}), + NewState = State#state{ + workers = [Worker | Workers2], + waiting = Waiting2, + callers = NewCallers1 + }, + {noreply, NewState} end - end; - Free2 -> - {noreply, State#state{free = Free2, callers = NewCallers0}} end; handle_info({'DOWN', Ref, process, _, _}, #state{callers = Callers} = State) -> @@ -147,9 +135,8 @@ code_change(_OldVsn, #state{}=State, _Extra) -> {ok, State}. -terminate(_Reason, State) -> - lists:foreach(fun ibrowse_http_client:stop/1, State#state.free), - lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy). +terminate(_Reason, _State) -> + ok. monitor_client(Callers, Worker, {ClientPid, _}) -> [{Worker, erlang:monitor(process, ClientPid)} | Callers]. @@ -167,22 +154,20 @@ release_worker_internal(Worker, State) -> #state{waiting = Waiting, callers = Callers} = State, NewCallers0 = demonitor_client(Callers, Worker), case is_process_alive(Worker) andalso - lists:member(Worker, State#state.busy) of + lists:member(Worker, State#state.workers) of true -> - case queue:out(Waiting) of + Workers = case queue:out(Waiting) of {empty, Waiting2} -> NewCallers1 = NewCallers0, - Busy2 = State#state.busy -- [Worker], - Free2 = [Worker | State#state.free]; + couch_replicator_connection:release(Worker), + State#state.workers -- [Worker]; {{value, From}, Waiting2} -> NewCallers1 = monitor_client(NewCallers0, Worker, From), gen_server:reply(From, {ok, Worker}), - Busy2 = State#state.busy, - Free2 = State#state.free + State#state.workers end, NewState = State#state{ - busy = Busy2, - free = Free2, + workers = Workers, waiting = Waiting2, callers = NewCallers1 }, |