summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-10 15:39:12 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-10 15:39:12 +0000
commit1298a4035b2d1a4791530ade8a586e22c4f48b1e (patch)
tree344176129361cae482c659c61001cc7ea19e5a92
parentfcf0ec3a214a97da1d4fee3f02bd730ed73c6840 (diff)
downloadrabbitmq-server-1298a4035b2d1a4791530ade8a586e22c4f48b1e.tar.gz
Reworked start_heartbeat_fun to allow a different timeout for send/receive heartbeats
-rw-r--r--src/rabbit_heartbeat.erl44
-rw-r--r--src/rabbit_reader.erl13
2 files changed, 29 insertions, 28 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ebdfbdc6..5f1e211e 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -41,11 +41,15 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
+-export_type([start_heartbeat_fun/0]).
--type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
+-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
-type(send_fun() :: fun ((rabbit_net:socket()) -> any())).
-type(timeout_fun() :: fun (() -> any())).
+-type(start_heartbeat_fun() ::
+ fun((rabbit_net:socket(), non_neg_integer(), non_neg_integer()) ->
+ no_return())).
-spec(start_heartbeat_sender/3 ::
(rabbit_net:socket(), non_neg_integer(), send_fun()) ->
@@ -55,9 +59,7 @@
rabbit_types:ok(pid())).
-spec(start_heartbeat_fun/3 ::
- (pid(), send_fun(), timeout_fun()) ->
- fun((rabbit_net:socket(), non_neg_integer())
- -> heartbeaters())).
+ (pid(), send_fun(), timeout_fun()) -> start_heartbeat_fun()).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -87,38 +89,40 @@ start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) ->
stop
end}).
-start_heartbeat_fun(SupPid, SendFun, TimeoutFun) ->
- fun (_Sock, 0) ->
- none;
- (Sock, TimeoutSec) ->
+start_heartbeat_fun(SupPid, SendFun, ReceiveFun) ->
+ fun (Sock, SendTimeoutSec, ReceiveTimeoutSec) ->
{ok, Sender} =
- supervisor2:start_child(
- SupPid, {heartbeat_sender,
- {rabbit_heartbeat, start_heartbeat_sender,
- [Sock, TimeoutSec, SendFun]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
{ok, Receiver} =
- supervisor2:start_child(
- SupPid, {heartbeat_receiver,
- {rabbit_heartbeat, start_heartbeat_receiver,
- [Sock, TimeoutSec, TimeoutFun]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
{Sender, Receiver}
end.
-pause_monitor(none) ->
+pause_monitor({_Sender, none}) ->
ok;
pause_monitor({_Sender, Receiver}) ->
Receiver ! pause,
ok.
-resume_monitor(none) ->
+resume_monitor({_Sender, none}) ->
ok;
resume_monitor({_Sender, Receiver}) ->
Receiver ! resume,
ok.
%%----------------------------------------------------------------------------
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+ {ok, none};
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+ supervisor2:start_child(
+ SupPid, {Name,
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
{ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 127467bb..54e51600 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -162,11 +162,7 @@
-ifdef(use_specs).
--type(start_heartbeat_fun() ::
- fun ((rabbit_networking:socket(), non_neg_integer()) ->
- rabbit_heartbeat:heartbeaters())).
-
--spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
@@ -177,9 +173,10 @@
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
+ -> no_return()).
-spec(start_connection/7 ::
- (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
rabbit_networking:socket(),
fun ((rabbit_networking:socket()) ->
rabbit_types:ok_or_error2(
@@ -771,7 +768,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = SHF(Sock, ClientHeartbeat),
+ Heartbeater = SHF(Sock, ClientHeartbeat, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,