summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-01 14:45:10 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-01 14:45:10 +0000
commit96998d05cb0de99c46d60ea8c1388b6ec66ffa95 (patch)
treefc2aff5c11fb79a372cee05dec9574080d2e6413
parent443129723dda65334ec0ca899fd6b48970fbadea (diff)
downloadrabbitmq-server-96998d05cb0de99c46d60ea8c1388b6ec66ffa95.tar.gz
ensure connection stats emission for write-only connections
We set up a stats timer in the writers but rather emitting any stats directly from there we just get them to 'ping' the reader, which will then emit stats based on its own timer. The timer in the writer is created when a socket operation has been confirmed. a little bit of drive-by refactoring to make implementation easier: - move state creation into one place - move reader into state TODO: suppress all this when in the Erlang client
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_writer.erl64
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2c1eeb91..829e9e52 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -342,6 +342,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State)
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
%% Ignore, we will emit a created event once we start running.
mainloop(Deb, State);
+handle_other(ensure_stats, Deb, State) ->
+ mainloop(Deb, ensure_stats_timer(State));
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f3a8cacf..c23f46f9 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,13 +18,17 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([start/5, start_link/5]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol, pending}).
+%% internal
+-export([mainloop/1, mainloop1/1]).
+
+-record(wstate, {sock, channel, frame_max, protocol, reader,
+ stats_timer, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -67,50 +71,47 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
--spec(mainloop/2 :: (_,_) -> 'done').
--spec(mainloop1/2 :: (_,_) -> any()).
-
-endif.
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
-
-mainloop(ReaderPid, State) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+
+initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ rabbit_event:init_stats_timer(#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ reader = ReaderPid,
+ pending = []},
+ #wstate.stats_timer).
+
+mainloop(State) ->
try
- mainloop1(ReaderPid, State)
+ mainloop1(State)
catch
- exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
+ ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(ReaderPid, State = #wstate{pending = []}) ->
+mainloop1(State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ erlang:hibernate(?MODULE, mainloop, [State])
end;
-mainloop1(ReaderPid, State) ->
+mainloop1(State) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(ReaderPid, flush(State))
+ ?MODULE:mainloop1(flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -139,9 +140,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
- State;
+ rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
+handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
+ ReaderPid ! ensure_stats,
+ rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).