summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-10-10 15:15:05 +0100
committerTim Watson <tim@rabbitmq.com>2013-10-10 15:15:05 +0100
commit42d7a8154cbfe74c0e626fdc43499dfa0a4f570f (patch)
treea647fd6b44331e5eecd656a5cea0252a50dd5d55
parentc48fa989303bd235f16c60168988818426a7d7c0 (diff)
downloadrabbitmq-server-42d7a8154cbfe74c0e626fdc43499dfa0a4f570f.tar.gz
Introduce rabbit_heartbeat:start/6 API
-rw-r--r--src/rabbit_connection_sup.erl4
-rw-r--r--src/rabbit_heartbeat.erl17
-rw-r--r--src/rabbit_reader.erl45
3 files changed, 35 insertions, 31 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 0c6dce76..ebdab666 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -55,9 +55,7 @@ start_link() ->
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link,
- [IntermediateSup,
- rabbit_heartbeat:start_heartbeat_fun(IntermediateSup)]},
+ {reader, {rabbit_reader, start_link, [IntermediateSup]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index fac74edb..eefab3bc 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,6 +16,7 @@
-module(rabbit_heartbeat).
+-export([start/6]).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
@@ -39,6 +40,11 @@
non_neg_integer(), heartbeat_callback()) ->
no_return())).
+-spec(start/6 ::
+ (pid(), rabbit_net:socket(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
+
-spec(start_heartbeat_sender/3 ::
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
@@ -61,6 +67,17 @@
%%----------------------------------------------------------------------------
+start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}.
+
start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 574bf882..47eb603f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,12 +18,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/1,
+-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/3, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,8 +37,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, start_heartbeat_fun,
- buf, buf_len, throttle}).
+ stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -74,8 +73,7 @@
-ifdef(use_specs).
--spec(start_link/2 :: (pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
- rabbit_types:ok(pid())).
+-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
@@ -86,11 +84,9 @@
rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
- -> no_return()).
--spec(start_connection/6 ::
- (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
- rabbit_net:socket(),
+-spec(init/2 :: (pid(), pid()) -> no_return()).
+-spec(start_connection/5 ::
+ (pid(), pid(), any(), rabbit_net:socket(),
fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
@@ -104,20 +100,17 @@
%%--------------------------------------------------------------------------
-start_link(HelperSup, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup,
- StartHeartbeatFun])}.
+start_link(HelperSup) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, HelperSup, StartHeartbeatFun) ->
+init(Parent, HelperSup) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(
- Parent, HelperSup, StartHeartbeatFun, Deb, Sock,
- SockTransform)
+ start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -205,8 +198,7 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, HelperSup, StartHeartbeatFun,
- Deb, Sock, SockTransform) ->
+start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
@@ -246,7 +238,6 @@ start_connection(Parent, HelperSup, StartHeartbeatFun,
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
- start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
throttle = #throttle{
@@ -852,8 +843,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
- sock = Sock,
- start_heartbeat_fun = SHF}) ->
+ sock = Sock}) ->
ServerFrameMax = server_frame_max(),
if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
rabbit_misc:protocol_error(
@@ -870,8 +860,9 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
- ClientHeartbeat, ReceiveFun),
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -1109,8 +1100,6 @@ pack_for_1_0(#v1{parent = Parent,
pending_recv = PendingRecv,
queue_collector = QueueCollector,
helper_sup = SupPid,
- start_heartbeat_fun = SHF,
buf = Buf,
buf_len = BufLen}) ->
- {Parent, Sock, RecvLen, PendingRecv, QueueCollector, SupPid, SHF,
- Buf, BufLen}.
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, SupPid, Buf, BufLen}.