summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_connection.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_connection.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_connection.erl171
1 files changed, 91 insertions, 80 deletions
diff --git a/src/couch_replicator/src/couch_replicator_connection.erl b/src/couch_replicator/src/couch_replicator_connection.erl
index d1c3d93fc..a158d2609 100644
--- a/src/couch_replicator/src/couch_replicator_connection.erl
+++ b/src/couch_replicator/src/couch_replicator_connection.erl
@@ -20,18 +20,18 @@
]).
-export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3
]).
-export([
- acquire/1,
- acquire/2,
- release/1
+ acquire/1,
+ acquire/2,
+ release/1
]).
-export([
@@ -44,7 +44,6 @@
-define(DEFAULT_CLOSE_INTERVAL, 90000).
-define(RELISTEN_DELAY, 5000).
-
-record(state, {
close_interval,
timer
@@ -59,40 +58,43 @@
mref
}).
-
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
init([]) ->
process_flag(trap_exit, true),
- ?MODULE = ets:new(?MODULE, [named_table, public,
- {keypos, #connection.worker}]),
+ ?MODULE = ets:new(?MODULE, [
+ named_table,
+ public,
+ {keypos, #connection.worker}
+ ]),
ok = config:listen_for_changes(?MODULE, nil),
- Interval = config:get_integer("replicator", "connection_close_interval",
- ?DEFAULT_CLOSE_INTERVAL),
+ Interval = config:get_integer(
+ "replicator",
+ "connection_close_interval",
+ ?DEFAULT_CLOSE_INTERVAL
+ ),
Timer = erlang:send_after(Interval, self(), close_idle_connections),
ibrowse:add_config([
{inactivity_timeout, Interval},
{worker_trap_exits, false}
]),
- {ok, #state{close_interval=Interval, timer=Timer}}.
+ {ok, #state{close_interval = Interval, timer = Timer}}.
acquire(Url) ->
acquire(Url, undefined).
acquire(Url, ProxyUrl) when is_binary(Url) ->
acquire(binary_to_list(Url), ProxyUrl);
-
acquire(Url, ProxyUrl) when is_binary(ProxyUrl) ->
acquire(Url, binary_to_list(ProxyUrl));
-
acquire(Url0, ProxyUrl0) ->
Url = couch_util:url_strip_password(Url0),
- ProxyUrl = case ProxyUrl0 of
- undefined -> undefined;
- _ -> couch_util:url_strip_password(ProxyUrl0)
- end,
+ ProxyUrl =
+ case ProxyUrl0 of
+ undefined -> undefined;
+ _ -> couch_util:url_strip_password(ProxyUrl0)
+ end,
case gen_server:call(?MODULE, {acquire, Url, ProxyUrl}) of
{ok, Worker} ->
link(Worker),
@@ -105,28 +107,37 @@ acquire(Url0, ProxyUrl0) ->
{error, Reason}
end.
-
release(Worker) ->
unlink(Worker),
gen_server:cast(?MODULE, {release, Worker}).
-
handle_call({acquire, Url, ProxyUrl}, From, State) ->
{Pid, _Ref} = From,
case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of
- {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
+ {#url{host = Host, port = Port}, #url{host = ProxyHost, port = ProxyPort}} ->
Pat = #connection{
- host=Host, port=Port,
- proxy_host=ProxyHost, proxy_port=ProxyPort,
- mref=undefined, _='_'},
+ host = Host,
+ port = Port,
+ proxy_host = ProxyHost,
+ proxy_port = ProxyPort,
+ mref = undefined,
+ _ = '_'
+ },
case ets:match_object(?MODULE, Pat, 1) of
'$end_of_table' ->
{reply, {error, all_allocated}, State};
{[Worker], _Cont} ->
- couch_stats:increment_counter([couch_replicator, connection,
- acquires]),
- ets:insert(?MODULE, Worker#connection{mref=monitor(process,
- Pid)}),
+ couch_stats:increment_counter([
+ couch_replicator,
+ connection,
+ acquires
+ ]),
+ ets:insert(?MODULE, Worker#connection{
+ mref = monitor(
+ process,
+ Pid
+ )
+ }),
{reply, {ok, Worker#connection.worker}, State}
end;
{{error, invalid_uri}, _} ->
@@ -134,26 +145,30 @@ handle_call({acquire, Url, ProxyUrl}, From, State) ->
{_, {error, invalid_uri}} ->
{reply, {error, invalid_uri}, State}
end;
-
handle_call({create, Url, ProxyUrl, Worker}, From, State) ->
{Pid, _Ref} = From,
case {ibrowse_lib:parse_url(Url), parse_proxy_url(ProxyUrl)} of
- {#url{host=Host, port=Port}, #url{host=ProxyHost, port=ProxyPort}} ->
+ {#url{host = Host, port = Port}, #url{host = ProxyHost, port = ProxyPort}} ->
link(Worker),
- couch_stats:increment_counter([couch_replicator, connection,
- creates]),
+ couch_stats:increment_counter([
+ couch_replicator,
+ connection,
+ creates
+ ]),
true = ets:insert_new(
?MODULE,
#connection{
- host=Host, port=Port,
- proxy_host=ProxyHost, proxy_port=ProxyPort,
- worker=Worker,
- mref=monitor(process, Pid)}
+ host = Host,
+ port = Port,
+ proxy_host = ProxyHost,
+ proxy_port = ProxyPort,
+ worker = Worker,
+ mref = monitor(process, Pid)
+ }
),
{reply, ok, State}
end.
-
handle_cast({release, WorkerPid}, State) ->
couch_stats:increment_counter([couch_replicator, connection, releases]),
case ets:lookup(?MODULE, WorkerPid) of
@@ -162,39 +177,45 @@ handle_cast({release, WorkerPid}, State) ->
MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
undefined -> ok
end,
- ets:insert(?MODULE, Worker#connection{mref=undefined});
+ ets:insert(?MODULE, Worker#connection{mref = undefined});
[] ->
ok
end,
{noreply, State};
-
handle_cast({connection_close_interval, V}, State) ->
erlang:cancel_timer(State#state.timer),
NewTimer = erlang:send_after(V, self(), close_idle_connections),
ibrowse:add_config([{inactivity_timeout, V}]),
- {noreply, State#state{close_interval=V, timer=NewTimer}}.
-
+ {noreply, State#state{close_interval = V, timer = NewTimer}}.
% owner crashed
handle_info({'DOWN', Ref, process, _Pid, _Reason}, State) ->
- couch_stats:increment_counter([couch_replicator, connection,
- owner_crashes]),
- Conns = ets:match_object(?MODULE, #connection{mref = Ref, _='_'}),
- lists:foreach(fun(Conn) ->
- couch_stats:increment_counter([couch_replicator, connection, closes]),
- delete_worker(Conn)
- end, Conns),
+ couch_stats:increment_counter([
+ couch_replicator,
+ connection,
+ owner_crashes
+ ]),
+ Conns = ets:match_object(?MODULE, #connection{mref = Ref, _ = '_'}),
+ lists:foreach(
+ fun(Conn) ->
+ couch_stats:increment_counter([couch_replicator, connection, closes]),
+ delete_worker(Conn)
+ end,
+ Conns
+ ),
{noreply, State};
-
% worker crashed
handle_info({'EXIT', Pid, Reason}, State) ->
- couch_stats:increment_counter([couch_replicator, connection,
- worker_crashes]),
+ couch_stats:increment_counter([
+ couch_replicator,
+ connection,
+ worker_crashes
+ ]),
case ets:lookup(?MODULE, Pid) of
[] ->
ok;
[Worker] ->
- #connection{host=Host, port=Port} = Worker,
+ #connection{host = Host, port = Port} = Worker,
maybe_log_worker_death(Host, Port, Reason),
case Worker#connection.mref of
MRef when is_reference(MRef) -> demonitor(MRef, [flush]);
@@ -203,42 +224,38 @@ handle_info({'EXIT', Pid, Reason}, State) ->
ets:delete(?MODULE, Pid)
end,
{noreply, State};
-
handle_info(close_idle_connections, State) ->
#state{
- close_interval=Interval,
- timer=Timer
+ close_interval = Interval,
+ timer = Timer
} = State,
- Conns = ets:match_object(?MODULE, #connection{mref=undefined, _='_'}),
- lists:foreach(fun(Conn) ->
- couch_stats:increment_counter([couch_replicator, connection, closes]),
- delete_worker(Conn)
- end, Conns),
+ Conns = ets:match_object(?MODULE, #connection{mref = undefined, _ = '_'}),
+ lists:foreach(
+ fun(Conn) ->
+ couch_stats:increment_counter([couch_replicator, connection, closes]),
+ delete_worker(Conn)
+ end,
+ Conns
+ ),
erlang:cancel_timer(Timer),
NewTimer = erlang:send_after(Interval, self(), close_idle_connections),
- {noreply, State#state{timer=NewTimer}};
-
+ {noreply, State#state{timer = NewTimer}};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State}.
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
terminate(_Reason, _State) ->
ok.
-
maybe_log_worker_death(_Host, _Port, normal) ->
ok;
-
maybe_log_worker_death(Host, Port, Reason) ->
ErrMsg = "Replication connection to: ~p:~p died with reason ~p",
couch_log:info(ErrMsg, [Host, Port, Reason]).
-
-spec delete_worker(#connection{}) -> ok.
delete_worker(Worker) ->
ets:delete(?MODULE, Worker#connection.worker),
@@ -246,25 +263,19 @@ delete_worker(Worker) ->
spawn(fun() -> ibrowse_http_client:stop(Worker#connection.worker) end),
ok.
-
handle_config_change("replicator", "connection_close_interval", V, _, S) ->
- ok = gen_server:cast(?MODULE, {connection_close_interval,
- list_to_integer(V)}),
+ ok = gen_server:cast(?MODULE, {connection_close_interval, list_to_integer(V)}),
{ok, S};
-
handle_config_change(_, _, _, _, S) ->
{ok, S}.
-
handle_config_terminate(_, stop, _) ->
ok;
-
handle_config_terminate(_, _, _) ->
Pid = whereis(?MODULE),
erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
parse_proxy_url(undefined) ->
- #url{host=undefined, port=undefined};
+ #url{host = undefined, port = undefined};
parse_proxy_url(ProxyUrl) ->
ibrowse_lib:parse_url(ProxyUrl).