diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_connection.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_connection.erl | 171 |
1 files changed, 91 insertions, 80 deletions
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl index d1c3d93fc..a158d2609 100644 --- a/src/couch_replicator/src/couch_replicator_connection.erl +++ b/src/couch_replicator/src/couch_replicator_connection.erl @@ -20,18 +20,18 @@ ]). -export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3 + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3 ]). -export([ - acquire/1, - acquire/2, - release/1 + acquire/1, + acquire/2, + release/1 ]). -export([ @@ -44,7 +44,6 @@ -define(DEFAULT_CLOSE_INTERVAL, 90000). -define(RELISTEN_DELAY, 5000). - -record(state, { close_interval, timer @@ -59,40 +58,43 @@ mref }). - start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - init([]) -> process_flag(trap_exit, true), - ?MODULE = ets:new(?MODULE, [named_table, public, - {keypos, #connection.worker}]), + ?MODULE = ets:new(?MODULE, [ + named_table, + public, + {keypos, #connection.worker} + ]), ok = config:listen_for_changes(?MODULE, nil), - Interval = config:get_integer("replicator", "connection_close_interval", - ?DEFAULT_CLOSE_INTERVAL), + Interval = config:get_integer( + "replicator", + "connection_close_interval", + ?DEFAULT_CLOSE_INTERVAL + ), Timer = erlang:send_after(Interval, self(), close_idle_connections), ibrowse:add_config([ {inactivity_timeout, Interval}, {worker_trap_exits, false} ]), - {ok, #state{close_interval=Interval, timer=Timer}}. + {ok, #state{close_interval = Interval, timer = Timer}}. acquire(Url) -> acquire(Url, undefined). acquire(Url, ProxyUrl) when is_binary(Url) -> acquire(binary_to_list(Url), ProxyUrl); - acquire(Url, ProxyUrl) when is_binary(ProxyUrl) -> acquire(Url, binary_to_list(ProxyUrl)); - acquire(Url0, ProxyUrl0) -> Url = couch_util:url_strip_password(Url0), - ProxyUrl = case ProxyUrl0 of - undefined -> undefined; - _ -> couch_util:url_strip_password(ProxyUrl0) - end, + ProxyUrl = + case ProxyUrl0 of + undefined -> undefined; + _ -> couch_util:url_strip_password(ProxyUrl0) + end, case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of {ok, Worker} -> link(Worker), @@ -105,28 +107,37 @@ acquire(Url0, ProxyUrl0) -> {error, Reason} end. - release(Worker) -> unlink(Worker), gen_server:cast(?MODULE, {release, Worker}). - handle_call({acquire, Url, ProxyUrl}, From, State) -> {Pid, _Ref} = From, case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of - {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} -> + {#url{host = Host, port = Port}, #url{host = ProxyHost, port = ProxyPort}} -> Pat = #connection{ - host=Host, port=Port, - proxy_host=ProxyHost, proxy_port=ProxyPort, - mref=undefined, _='_'}, + host = Host, + port = Port, + proxy_host = ProxyHost, + proxy_port = ProxyPort, + mref = undefined, + _ = '_' + }, case ets:match_object(?MODULE, Pat, 1) of '$end_of_table' -> {reply, {error, all_allocated}, State}; {[Worker], _Cont} -> - couch_stats:increment_counter([couch_replicator, connection, - acquires]), - ets:insert(?MODULE, Worker#connection{mref=monitor(process, - Pid)}), + couch_stats:increment_counter([ + couch_replicator, + connection, + acquires + ]), + ets:insert(?MODULE, Worker#connection{ + mref = monitor( + process, + Pid + ) + }), {reply, {ok, Worker#connection.worker}, State} end; {{error, invalid_uri}, _} -> @@ -134,26 +145,30 @@ handle_call({acquire, Url, ProxyUrl}, From, State) -> {_, {error, invalid_uri}} -> {reply, {error, invalid_uri}, State} end; - handle_call({create, Url, ProxyUrl, Worker}, From, State) -> {Pid, _Ref} = From, case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of - {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} -> + {#url{host = Host, port = Port}, #url{host = ProxyHost, port = ProxyPort}} -> link(Worker), - couch_stats:increment_counter([couch_replicator, connection, - creates]), + couch_stats:increment_counter([ + couch_replicator, + connection, + creates + ]), true = ets:insert_new( ?MODULE, #connection{ - host=Host, port=Port, - proxy_host=ProxyHost, proxy_port=ProxyPort, - worker=Worker, - mref=monitor(process, Pid)} + host = Host, + port = Port, + proxy_host = ProxyHost, + proxy_port = ProxyPort, + worker = Worker, + mref = monitor(process, Pid) + } ), {reply, ok, State} end. - handle_cast({release, WorkerPid}, State) -> couch_stats:increment_counter([couch_replicator, connection, releases]), case ets:lookup(?MODULE, WorkerPid) of @@ -162,39 +177,45 @@ handle_cast({release, WorkerPid}, State) -> MRef when is_reference(MRef) -> demonitor(MRef, [flush]); undefined -> ok end, - ets:insert(?MODULE, Worker#connection{mref=undefined}); + ets:insert(?MODULE, Worker#connection{mref = undefined}); [] -> ok end, {noreply, State}; - handle_cast({connection_close_interval, V}, State) -> erlang:cancel_timer(State#state.timer), NewTimer = erlang:send_after(V, self(), close_idle_connections), ibrowse:add_config([{inactivity_timeout, V}]), - {noreply, State#state{close_interval=V, timer=NewTimer}}. - + {noreply, State#state{close_interval = V, timer = NewTimer}}. % owner crashed handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) -> - couch_stats:increment_counter([couch_replicator, connection, - owner_crashes]), - Conns = ets:match_object(?MODULE, #connection{mref = Ref, _='_'}), - lists:foreach(fun(Conn) -> - couch_stats:increment_counter([couch_replicator, connection, closes]), - delete_worker(Conn) - end, Conns), + couch_stats:increment_counter([ + couch_replicator, + connection, + owner_crashes + ]), + Conns = ets:match_object(?MODULE, #connection{mref = Ref, _ = '_'}), + lists:foreach( + fun(Conn) -> + couch_stats:increment_counter([couch_replicator, connection, closes]), + delete_worker(Conn) + end, + Conns + ), {noreply, State}; - % worker crashed handle_info({'EXIT', Pid, Reason}, State) -> - couch_stats:increment_counter([couch_replicator, connection, - worker_crashes]), + couch_stats:increment_counter([ + couch_replicator, + connection, + worker_crashes + ]), case ets:lookup(?MODULE, Pid) of [] -> ok; [Worker] -> - #connection{host=Host, port=Port} = Worker, + #connection{host = Host, port = Port} = Worker, maybe_log_worker_death(Host, Port, Reason), case Worker#connection.mref of MRef when is_reference(MRef) -> demonitor(MRef, [flush]); @@ -203,42 +224,38 @@ handle_info({'EXIT', Pid, Reason}, State) -> ets:delete(?MODULE, Pid) end, {noreply, State}; - handle_info(close_idle_connections, State) -> #state{ - close_interval=Interval, - timer=Timer + close_interval = Interval, + timer = Timer } = State, - Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}), - lists:foreach(fun(Conn) -> - couch_stats:increment_counter([couch_replicator, connection, closes]), - delete_worker(Conn) - end, Conns), + Conns = ets:match_object(?MODULE, #connection{mref = undefined, _ = '_'}), + lists:foreach( + fun(Conn) -> + couch_stats:increment_counter([couch_replicator, connection, closes]), + delete_worker(Conn) + end, + Conns + ), erlang:cancel_timer(Timer), NewTimer = erlang:send_after(Interval, self(), close_idle_connections), - {noreply, State#state{timer=NewTimer}}; - + {noreply, State#state{timer = NewTimer}}; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, nil), {noreply, State}. - code_change(_OldVsn, State, _Extra) -> {ok, State}. - terminate(_Reason, _State) -> ok. - maybe_log_worker_death(_Host, _Port, normal) -> ok; - maybe_log_worker_death(Host, Port, Reason) -> ErrMsg = "Replication connection to: ~p:~p died with reason ~p", couch_log:info(ErrMsg, [Host, Port, Reason]). - -spec delete_worker(#connection{}) -> ok. delete_worker(Worker) -> ets:delete(?MODULE, Worker#connection.worker), @@ -246,25 +263,19 @@ delete_worker(Worker) -> spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end), ok. - handle_config_change("replicator", "connection_close_interval", V, _, S) -> - ok = gen_server:cast(?MODULE, {connection_close_interval, - list_to_integer(V)}), + ok = gen_server:cast(?MODULE, {connection_close_interval, list_to_integer(V)}), {ok, S}; - handle_config_change(_, _, _, _, S) -> {ok, S}. - handle_config_terminate(_, stop, _) -> ok; - handle_config_terminate(_, _, _) -> Pid = whereis(?MODULE), erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). - parse_proxy_url(undefined) -> - #url{host=undefined, port=undefined}; + #url{host = undefined, port = undefined}; parse_proxy_url(ProxyUrl) -> ibrowse_lib:parse_url(ProxyUrl). |