summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-09 18:45:37 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-09 18:45:37 +0100
commitd08175070c9e38e9d9993ea8e1abf046c4b95732 (patch)
tree54107fd6357807f731b7ce1bcdd0c41637b2bfab
parentbb0832b6e8fb747b56c7e4fad91b3e14909dab38 (diff)
downloadrabbitmq-server-d08175070c9e38e9d9993ea8e1abf046c4b95732.tar.gz
Fixes all over. Still more to do, though mainly API fixes. Functionality seems to be correct.
-rw-r--r--src/rabbit_channel.erl15
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_framing_channel.erl5
-rw-r--r--src/rabbit_heartbeat.erl54
-rw-r--r--src/rabbit_limiter.erl9
-rw-r--r--src/rabbit_reader.erl14
-rw-r--r--src/rabbit_writer.erl34
7 files changed, 63 insertions, 72 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e93b6665..c45b2cc7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -153,8 +153,6 @@ flush(Pid) ->
init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
CollectorPid]) ->
- process_flag(trap_exit, true),
- link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
channel = Channel,
@@ -243,12 +241,6 @@ handle_cast(emit_stats, State) ->
internal_emit_stats(State),
{noreply, State}.
-handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
- State = #ch{writer_pid = WriterPid}) ->
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
- {stop, normal, State};
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
@@ -1025,7 +1017,7 @@ fold_per_queue(F, Acc0, UAQ) ->
start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) ->
Me = self(),
{ok, LPid} =
- supervisor:start_child(
+ supervisor2:start_child(
ParentPid,
{limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}),
@@ -1100,10 +1092,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(#ch{limiter_pid = LimiterPid}) ->
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]),
- rabbit_limiter:shutdown(LimiterPid).
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 9e68b497..9011db73 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -60,7 +60,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
supervisor2:start_child(
SupPid,
{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol]},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
{ok, ChannelPid} =
supervisor2:start_child(
@@ -73,7 +73,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
supervisor2:start_child(
SupPid,
{framing_channel, {rabbit_framing_channel, start_link,
- [ChannelPid, Protocol]},
+ [ReaderPid, ChannelPid, Protocol]},
intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
{ok, SupPid, FramingChannelPid}.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 08aaafe1..cb53185f 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,15 +32,14 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/2, process/2, shutdown/1]).
+-export([start_link/3, process/2, shutdown/1]).
%% internal
-export([mainloop/3]).
%%--------------------------------------------------------------------
-start_link(ChannelPid, Protocol) ->
- Parent = self(),
+start_link(Parent, ChannelPid, Protocol) ->
{ok, proc_lib:spawn_link(
fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index b277de70..264dbb68 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -32,8 +32,8 @@
-module(rabbit_heartbeat).
-export([start_heartbeat/3, pause_monitor/1, resume_monitor/1,
- start_heartbeat_sender/2,
- start_heartbeat_receiver/2]).
+ start_heartbeat_sender/3,
+ start_heartbeat_receiver/3]).
-include("rabbit.hrl").
@@ -45,9 +45,11 @@
-spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) ->
pids()).
--spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+-spec(start_heartbeat_sender/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
rabbit_types:ok(pid())).
--spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+-spec(start_heartbeat_receiver/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (pids()) -> 'ok').
@@ -60,41 +62,41 @@
start_heartbeat(_Sup, _Sock, 0) ->
none;
start_heartbeat(Sup, Sock, TimeoutSec) ->
+ Parent = self(),
{ok, Sender} =
- supervisor:start_child(
+ supervisor2:start_child(
Sup, {heartbeat_sender,
- {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]},
- permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {?MODULE, start_heartbeat_sender, [Parent, Sock, TimeoutSec]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
{ok, Receiver} =
- supervisor:start_child(
+ supervisor2:start_child(
Sup, {heartbeat_receiver,
- {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]},
- permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
{Sender, Receiver}.
-start_heartbeat_sender(Sock, TimeoutSec) ->
+start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Parent = self(),
{ok, proc_lib:spawn_link(
- fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
- send_oct, 0,
+ fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
continue
- end}, Parent)
+ end}, {0, 0})
end)}.
-start_heartbeat_receiver(Sock, TimeoutSec) ->
+start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Parent = self(),
{ok, proc_lib:spawn_link(
- fun () -> heartbeater({Sock, TimeoutSec * 1000,
- recv_oct, 1,
- fun () -> exit(timeout) end}, Parent) end)}.
+ fun () -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
+ fun () ->
+ Parent ! timeout,
+ stop
+ end}, {0, 0}) end)}.
pause_monitor(none) ->
ok;
@@ -110,19 +112,12 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
-heartbeater(Params, Parent) ->
- heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
-
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- MonitorRef, {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
pause ->
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
resume ->
Recurse({0, 0});
Other ->
@@ -139,6 +134,7 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Recurse({NewStatVal, SameCount + 1});
true ->
case Handler() of
+ stop -> ok;
continue -> Recurse({NewStatVal, 0})
end
end;
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 813ccc75..fccf5f34 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/2, shutdown/1]).
+-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1]).
@@ -46,7 +46,6 @@
-type(maybe_pid() :: pid() | 'undefined').
-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())).
--spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
@@ -76,12 +75,6 @@
start_link(ChPid, UnackedMsgCount) ->
gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
-shutdown(undefined) ->
- ok;
-shutdown(LimiterPid) ->
- true = unlink(LimiterPid),
- gen_server2:cast(LimiterPid, shutdown).
-
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6e336c86..e0f4d6ec 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -372,6 +372,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
true ->
throw({handshake_timeout, State#v1.callback})
end;
+ timeout ->
+ throw({timeout, State#v1.connection_state});
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -460,9 +462,9 @@ handle_channel_exit(Channel, Reason, State) ->
handle_dependent_exit(Pid, Reason, State) ->
case (case Reason of
+ normal -> controlled;
shutdown -> controlled;
{shutdown, _Term} -> controlled;
- normal -> controlled;
_ -> uncontrolled
end) of
controlled ->
@@ -878,14 +880,14 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- {ok, _ChanSup, FrChPid} =
+ {ok, _ChanSup, ChPid} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, [Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector]),
- link(FrChPid),
- put({channel, Channel}, {chpid, FrChPid}),
- put({chpid, FrChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame).
+ link(ChPid),
+ put({channel, Channel}, {chpid, ChPid}),
+ put({chpid, ChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 483b46f7..faeec406 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/4, start_link/4, flush/1, mainloop/1]).
+-export([start/5, start_link/5, flush/1, mainloop/2, mainloop1/2]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
@@ -48,13 +48,13 @@
-ifdef(use_specs).
--spec(start/4 ::
+-spec(start/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(start_link/4 ::
+-spec(start_link/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -85,25 +85,35 @@
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol) ->
+start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
- proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock,
+ proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
-start_link(Sock, Channel, FrameMax, Protocol) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
- proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
-mainloop(State) ->
+mainloop(ReaderPid, State) ->
+ try
+ mainloop1(ReaderPid, State)
+ catch
+ exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ end,
+ done.
+
+mainloop1(ReaderPid, State) ->
receive
- Message -> ?MODULE:mainloop(handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
handle_message({send_command, MethodRecord},