diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-02 16:09:22 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-02 16:09:22 +0000 |
commit | 600b7363321e3ab9c01ef92cfc1b3d707aeb8edd (patch) | |
tree | 974b852b72516f2cc91a61835ca3a270e930e0ad | |
parent | 22fdac5474e2746136403cdcefaafaf781fd70a9 (diff) | |
parent | 43d796ee6ee1f162f0979b87c9b4ad75f123f1f1 (diff) | |
download | rabbitmq-server-600b7363321e3ab9c01ef92cfc1b3d707aeb8edd.tar.gz |
Merge bug23225
-rw-r--r-- | src/rabbit_channel_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_event.erl | 10 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 83 |
4 files changed, 63 insertions, 34 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index bcb83851..42459833 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -83,7 +83,7 @@ init(Type) -> child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> [{writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + [Sock, Channel, FrameMax, Protocol, ReaderPid, true]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; child_specs(direct) -> [{limiter, {rabbit_limiter, start_link, []}, diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 3f1b20fe..7d91b6fa 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,8 +19,8 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]). --export([reset_stats_timer/2]). +-export([init_stats_timer/2, init_disabled_stats_timer/2, + ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). -export([notify/2, notify_if/3]). @@ -51,6 +51,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(init_stats_timer/2 :: (container(), pos()) -> container()). +-spec(init_disabled_stats_timer/2 :: (container(), pos()) -> container()). -spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()). -spec(stop_stats_timer/2 :: (container(), pos()) -> container()). -spec(reset_stats_timer/2 :: (container(), pos()) -> container()). @@ -90,10 +91,13 @@ start_link() -> init_stats_timer(C, P) -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), - {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), + {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), setelement(P, C, #state{level = StatsLevel, interval = Interval, timer = undefined}). +init_disabled_stats_timer(C, P) -> + setelement(P, C, #state{level = none, interval = 0, timer = undefined}). + ensure_stats_timer(C, P, Msg) -> case element(P, C) of #state{level = Level, interval = Interval, timer = undefined} = State diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2c1eeb91..829e9e52 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -342,6 +342,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State) handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> %% Ignore, we will emit a created event once we start running. mainloop(Deb, State); +handle_other(ensure_stats, Deb, State) -> + mainloop(Deb, ensure_stats_timer(State)); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 0e26a8a3..a7ea3d99 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,13 +18,17 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol, pending}). +%% internal +-export([mainloop/1, mainloop1/1]). + +-record(wstate, {sock, channel, frame_max, protocol, reader, + stats_timer, pending}). -define(HIBERNATE_AFTER, 5000). @@ -40,6 +44,14 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). +-spec(start/6 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + -> rabbit_types:ok(pid())). +-spec(start_link/6 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) + -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -67,50 +79,58 @@ non_neg_integer(), rabbit_types:protocol()) -> 'ok'). --spec(mainloop/2 :: (_,_) -> 'done'). --spec(mainloop1/2 :: (_,_) -> any()). - -endif. %%--------------------------------------------------------------------------- start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. + start(Sock, Channel, FrameMax, Protocol, ReaderPid, false). start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. - -mainloop(ReaderPid, State) -> + start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false). + +start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, + ReaderWantsStats), + {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. + +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, + ReaderWantsStats), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + +initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> + (case ReaderWantsStats of + true -> fun rabbit_event:init_stats_timer/2; + false -> fun rabbit_event:init_disabled_stats_timer/2 + end)(#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + reader = ReaderPid, + pending = []}, + #wstate.stats_timer). + +mainloop(State) -> try - mainloop1(ReaderPid, State) + mainloop1(State) catch - exit:Error -> ReaderPid ! {channel_exit, State#wstate.channel, Error} + exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, + ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(ReaderPid, State = #wstate{pending = []}) -> +mainloop1(State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + erlang:hibernate(?MODULE, mainloop, [State]) end; -mainloop1(ReaderPid, State) -> +mainloop1(State) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(ReaderPid, flush(State)) + ?MODULE:mainloop1(flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -139,9 +159,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; handle_message({inet_reply, _, ok}, State) -> - State; + rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats); handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); +handle_message(emit_stats, State = #wstate{reader = ReaderPid}) -> + ReaderPid ! ensure_stats, + rabbit_event:reset_stats_timer(State, #wstate.stats_timer); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). |