diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-09 18:45:37 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-09 18:45:37 +0100 |
commit | d08175070c9e38e9d9993ea8e1abf046c4b95732 (patch) | |
tree | 54107fd6357807f731b7ce1bcdd0c41637b2bfab | |
parent | bb0832b6e8fb747b56c7e4fad91b3e14909dab38 (diff) | |
download | rabbitmq-server-d08175070c9e38e9d9993ea8e1abf046c4b95732.tar.gz |
Fixes all over. Still more to do, though mainly API fixes. Functionality seems to be correct.
-rw-r--r-- | src/rabbit_channel.erl | 15 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 4 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 5 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 54 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 9 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 14 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 34 |
7 files changed, 63 insertions, 72 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e93b6665..c45b2cc7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -153,8 +153,6 @@ flush(Pid) -> init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> - process_flag(trap_exit, true), - link(WriterPid), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, @@ -243,12 +241,6 @@ handle_cast(emit_stats, State) -> internal_emit_stats(State), {noreply, State}. -handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, - State = #ch{writer_pid = WriterPid}) -> - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. @@ -1025,7 +1017,7 @@ fold_per_queue(F, Acc0, UAQ) -> start_limiter(State = #ch{unacked_message_q = UAMQ, parent_pid = ParentPid}) -> Me = self(), {ok, LPid} = - supervisor:start_child( + supervisor2:start_child( ParentPid, {limiter, {rabbit_limiter, start_link, [Me, queue:len(UAMQ)]}, transient, ?MAX_WAIT, worker, [rabbit_limiter]}), @@ -1100,10 +1092,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{limiter_pid = LimiterPid}) -> +terminate(_State) -> pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_limiter:shutdown(LimiterPid). + rabbit_event:notify(channel_closed, [{pid, self()}]). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9e68b497..9011db73 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -60,7 +60,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, supervisor2:start_child( SupPid, {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol]}, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), {ok, ChannelPid} = supervisor2:start_child( @@ -73,7 +73,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, supervisor2:start_child( SupPid, {framing_channel, {rabbit_framing_channel, start_link, - [ChannelPid, Protocol]}, + [ReaderPid, ChannelPid, Protocol]}, intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), {ok, SupPid, FramingChannelPid}. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 08aaafe1..cb53185f 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,15 +32,14 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/3, process/2, shutdown/1]). %% internal -export([mainloop/3]). %%-------------------------------------------------------------------- -start_link(ChannelPid, Protocol) -> - Parent = self(), +start_link(Parent, ChannelPid, Protocol) -> {ok, proc_lib:spawn_link( fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index b277de70..264dbb68 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,8 +32,8 @@ -module(rabbit_heartbeat). -export([start_heartbeat/3, pause_monitor/1, resume_monitor/1, - start_heartbeat_sender/2, - start_heartbeat_receiver/2]). + start_heartbeat_sender/3, + start_heartbeat_receiver/3]). -include("rabbit.hrl"). @@ -45,9 +45,11 @@ -spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> pids()). --spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) -> +-spec(start_heartbeat_sender/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) -> +-spec(start_heartbeat_receiver/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> rabbit_types:ok(pid())). -spec(pause_monitor/1 :: (pids()) -> 'ok'). @@ -60,41 +62,41 @@ start_heartbeat(_Sup, _Sock, 0) -> none; start_heartbeat(Sup, Sock, TimeoutSec) -> + Parent = self(), {ok, Sender} = - supervisor:start_child( + supervisor2:start_child( Sup, {heartbeat_sender, - {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, - permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {?MODULE, start_heartbeat_sender, [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {ok, Receiver} = - supervisor:start_child( + supervisor2:start_child( Sup, {heartbeat_receiver, - {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, - permanent, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {?MODULE, start_heartbeat_receiver, [Parent, Sock, TimeoutSec]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}), {Sender, Receiver}. -start_heartbeat_sender(Sock, TimeoutSec) -> +start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Parent = self(), {ok, proc_lib:spawn_link( - fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, - send_oct, 0, + fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue - end}, Parent) + end}, {0, 0}) end)}. -start_heartbeat_receiver(Sock, TimeoutSec) -> +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Parent = self(), {ok, proc_lib:spawn_link( - fun () -> heartbeater({Sock, TimeoutSec * 1000, - recv_oct, 1, - fun () -> exit(timeout) end}, Parent) end)}. + fun () -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, + fun () -> + Parent ! timeout, + stop + end}, {0, 0}) end)}. pause_monitor(none) -> ok; @@ -110,19 +112,12 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- -heartbeater(Params, Parent) -> - heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). - heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - MonitorRef, {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, V) end, receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; pause -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; resume -> Recurse({0, 0}); Other -> @@ -139,6 +134,7 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Recurse({NewStatVal, SameCount + 1}); true -> case Handler() of + stop -> ok; continue -> Recurse({NewStatVal, 0}) end end; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 813ccc75..fccf5f34 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/2, shutdown/1]). +-export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -46,7 +46,6 @@ -type(maybe_pid() :: pid() | 'undefined'). -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). @@ -76,12 +75,6 @@ start_link(ChPid, UnackedMsgCount) -> gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - true = unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). - limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6e336c86..e0f4d6ec 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -372,6 +372,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> true -> throw({handshake_timeout, State#v1.callback}) end; + timeout -> + throw({timeout, State#v1.connection_state}); {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -460,9 +462,9 @@ handle_channel_exit(Channel, Reason, State) -> handle_dependent_exit(Pid, Reason, State) -> case (case Reason of + normal -> controlled; shutdown -> controlled; {shutdown, _Term} -> controlled; - normal -> controlled; _ -> uncontrolled end) of controlled -> @@ -878,14 +880,14 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, _ChanSup, FrChPid} = + {ok, _ChanSup, ChPid} = rabbit_channel_sup_sup:start_channel( ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector]), - link(FrChPid), - put({channel, Channel}, {chpid, FrChPid}), - put({chpid, FrChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(FrChPid, AnalyzedFrame). + link(ChPid), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 483b46f7..faeec406 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, start_link/4, flush/1, mainloop/1]). +-export([start/5, start_link/5, flush/1, mainloop/2, mainloop1/2]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). @@ -48,13 +48,13 @@ -ifdef(use_specs). --spec(start/4 :: +-spec(start/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(start_link/4 :: +-spec(start_link/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -85,25 +85,35 @@ %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol) -> +start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, - proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock, + proc_lib:spawn(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -start_link(Sock, Channel, FrameMax, Protocol) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, - proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -mainloop(State) -> +mainloop(ReaderPid, State) -> + try + mainloop1(ReaderPid, State) + catch + exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + end, + done. + +mainloop1(ReaderPid, State) -> receive - Message -> ?MODULE:mainloop(handle_message(Message, State)) + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. handle_message({send_command, MethodRecord}, |