diff options
author | Rob Harrop <rob@rabbitmq.com> | 2010-11-09 22:11:32 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2010-11-09 22:11:32 +0000 |
commit | 28409088bda290347a25a61a47aaceca156c3316 (patch) | |
tree | 994d60c5a7a543002d587f98dc98ead515e5bf8a | |
parent | 59b4142070bf526299c400aef84370d022eb52b0 (diff) | |
download | rabbitmq-server-28409088bda290347a25a61a47aaceca156c3316.tar.gz |
Reworked rabbit_heartbeat to allow for reuse in the STOMP adapter
-rw-r--r-- | src/rabbit_connection_sup.erl | 13 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 14 |
2 files changed, 18 insertions, 9 deletions
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b3821d3b..184b0245 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -83,17 +83,26 @@ start_heartbeat_fun(SupPid) -> none; (Sock, TimeoutSec) -> Parent = self(), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + TimeoutFun = + fun() -> + Parent ! timeout + end, {ok, Sender} = supervisor2:start_child( SupPid, {heartbeat_sender, {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, + [Sock, TimeoutSec, SendFun]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {ok, Receiver} = supervisor2:start_child( SupPid, {heartbeat_receiver, {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, + [Sock, TimeoutSec, TimeoutFun]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {Sender, Receiver} end. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1..8188f381 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -43,12 +43,13 @@ -export_type([heartbeaters/0]). -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(callback_fun() :: fun (() -> any())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), callback_fun()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,24 +59,23 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, TimeoutFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + TimeoutFun(), stop end}). |