diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-01 14:45:10 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-01 14:45:10 +0000 |
commit | 96998d05cb0de99c46d60ea8c1388b6ec66ffa95 (patch) | |
tree | fc2aff5c11fb79a372cee05dec9574080d2e6413 | |
parent | 443129723dda65334ec0ca899fd6b48970fbadea (diff) | |
download | rabbitmq-server-96998d05cb0de99c46d60ea8c1388b6ec66ffa95.tar.gz |
ensure connection stats emission for write-only connections
We set up a stats timer in the writers but rather emitting any stats
directly from there we just get them to 'ping' the reader, which will
then emit stats based on its own timer.
The timer in the writer is created when a socket operation has been
confirmed.
a little bit of drive-by refactoring to make implementation easier:
- move state creation into one place
- move reader into state
TODO: suppress all this when in the Erlang client
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 64 |
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2c1eeb91..829e9e52 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -342,6 +342,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State) handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> %% Ignore, we will emit a created event once we start running. mainloop(Deb, State); +handle_other(ensure_stats, Deb, State) -> + mainloop(Deb, ensure_stats_timer(State)); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f3a8cacf..c23f46f9 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,13 +18,17 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([start/5, start_link/5]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol, pending}). +%% internal +-export([mainloop/1, mainloop1/1]). + +-record(wstate, {sock, channel, frame_max, protocol, reader, + stats_timer, pending}). -define(HIBERNATE_AFTER, 5000). @@ -67,50 +71,47 @@ non_neg_integer(), rabbit_types:protocol()) -> 'ok'). --spec(mainloop/2 :: (_,_) -> 'done'). --spec(mainloop1/2 :: (_,_) -> any()). - -endif. %%--------------------------------------------------------------------------- start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid), + {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. - -mainloop(ReaderPid, State) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + +initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + rabbit_event:init_stats_timer(#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + reader = ReaderPid, + pending = []}, + #wstate.stats_timer). + +mainloop(State) -> try - mainloop1(ReaderPid, State) + mainloop1(State) catch - exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, + ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(ReaderPid, State = #wstate{pending = []}) -> +mainloop1(State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + erlang:hibernate(?MODULE, mainloop, [State]) end; -mainloop1(ReaderPid, State) -> +mainloop1(State) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(ReaderPid, flush(State)) + ?MODULE:mainloop1(flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -139,9 +140,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; handle_message({inet_reply, _, ok}, State) -> - State; + rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats); handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); +handle_message(emit_stats, State = #wstate{reader = ReaderPid}) -> + ReaderPid ! ensure_stats, + rabbit_event:reset_stats_timer(State, #wstate.stats_timer); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). |