diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-10-10 15:15:05 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-10-10 15:15:05 +0100 |
commit | 42d7a8154cbfe74c0e626fdc43499dfa0a4f570f (patch) | |
tree | a647fd6b44331e5eecd656a5cea0252a50dd5d55 | |
parent | c48fa989303bd235f16c60168988818426a7d7c0 (diff) | |
download | rabbitmq-server-42d7a8154cbfe74c0e626fdc43499dfa0a4f570f.tar.gz |
Introduce rabbit_heartbeat:start/6 API
-rw-r--r-- | src/rabbit_connection_sup.erl | 4 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 17 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 45 |
3 files changed, 35 insertions, 31 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 0c6dce76..ebdab666 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -55,9 +55,7 @@ start_link() -> {ok, ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, - [IntermediateSup, - rabbit_heartbeat:start_heartbeat_fun(IntermediateSup)]}, + {reader, {rabbit_reader, start_link, [IntermediateSup]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index fac74edb..eefab3bc 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,6 +16,7 @@ -module(rabbit_heartbeat). +-export([start/6]). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). @@ -39,6 +40,11 @@ non_neg_integer(), heartbeat_callback()) -> no_return())). +-spec(start/6 :: + (pid(), rabbit_net:socket(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). + -spec(start_heartbeat_sender/3 :: (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). @@ -61,6 +67,17 @@ %%---------------------------------------------------------------------------- +start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver}. + start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 574bf882..47eb603f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,12 +18,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/3, mainloop/2, recvloop/2]). +-export([init/2, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -37,8 +37,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, start_heartbeat_fun, - buf, buf_len, throttle}). + stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, @@ -74,8 +73,7 @@ -ifdef(use_specs). --spec(start_link/2 :: (pid(), rabbit_heartbeat:start_heartbeat_fun()) -> - rabbit_types:ok(pid())). +-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). @@ -86,11 +84,9 @@ rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) - -> no_return()). --spec(start_connection/6 :: - (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), - rabbit_net:socket(), +-spec(init/2 :: (pid(), pid()) -> no_return()). +-spec(start_connection/5 :: + (pid(), pid(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). @@ -104,20 +100,17 @@ %%-------------------------------------------------------------------------- -start_link(HelperSup, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, - StartHeartbeatFun])}. +start_link(HelperSup) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, HelperSup, StartHeartbeatFun) -> +init(Parent, HelperSup) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection( - Parent, HelperSup, StartHeartbeatFun, Deb, Sock, - SockTransform) + start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -205,8 +198,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, HelperSup, StartHeartbeatFun, - Deb, Sock, SockTransform) -> +start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of {ok, Str} -> Str; @@ -246,7 +238,6 @@ start_connection(Parent, HelperSup, StartHeartbeatFun, helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, throttle = #throttle{ @@ -852,8 +843,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, - sock = Sock, - start_heartbeat_fun = SHF}) -> + sock = Sock}) -> ServerFrameMax = server_frame_max(), if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> rabbit_misc:protocol_error( @@ -870,8 +860,9 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, - ClientHeartbeat, ReceiveFun), + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -1109,8 +1100,6 @@ pack_for_1_0(#v1{parent = Parent, pending_recv = PendingRecv, queue_collector = QueueCollector, helper_sup = SupPid, - start_heartbeat_fun = SHF, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, SupPid, SHF, - Buf, BufLen}. + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, SupPid, Buf, BufLen}. |