summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-17 16:43:07 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-17 16:43:07 +0000
commitefdbf0a822e141a26d1306011a397c2638584ff9 (patch)
treee227d8c8f6569dbe6436edd9a6a064937f7a09a0
parentf90640eda347e7dc1756f5dbe747628816614fc8 (diff)
downloadrabbitmq-server-efdbf0a822e141a26d1306011a397c2638584ff9.tar.gz
allow 'become' to return, and become something else
-rw-r--r--src/rabbit_reader.erl20
1 files changed, 12 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a1dfeeff..7b867c8e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -240,13 +240,12 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
- throw:{become, M, F, A} ->
- apply(M, F, A);
Ex -> log(case Ex of
connection_closed_abruptly -> warning;
_ -> error
@@ -265,6 +264,11 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -998,8 +1002,8 @@ emit_stats(State) ->
become_1_0(Mode, Version, State = #v1{sock = Sock}) ->
case code:is_loaded(rabbit_amqp1_0_reader) of
false -> refuse_connection(Sock, {bad_version, Version});
- _ -> throw({become, rabbit_amqp1_0_reader, become,
- [Mode, pack_for_1_0(State)]})
+ _ -> throw({become, {rabbit_amqp1_0_reader, become,
+ [Mode, pack_for_1_0(State)]}})
end.
pack_for_1_0(#v1{parent = Parent,