summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-12 19:02:09 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-12 19:02:09 +0000
commitede8e1c8ae62b66e32c93d86854db333d3695480 (patch)
treea2e4df0b3724662bc92a68ae1dab2a1710f29b84
parent280ff6fd1b143b7e5213b8d0f4d43f003aa4e375 (diff)
downloadrabbitmq-server-ede8e1c8ae62b66e32c93d86854db333d3695480.tar.gz
Attempt to show information about blocking due to memory alarm and flow control in a more comprehensible manner.
-rw-r--r--docs/rabbitmqctl.1.xml24
-rw-r--r--src/rabbit_control.erl3
-rw-r--r--src/rabbit_reader.erl25
3 files changed, 45 insertions, 7 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 15755038..2594f9af 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1067,6 +1067,25 @@
<listitem><para>The period for which the peer's SSL
certificate is valid.</para></listitem>
</varlistentry>
+
+ <varlistentry>
+ <term>time_since_flow_ctl</term>
+ <listitem><para>Time since RabbitMQ last imposed flow
+ control on this connection as it was attempting to
+ publish too fast for the server to keep
+ up. Connections which are flow controlled can still
+ publish but at a reduced rate.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>mem_blocked</term>
+ <listitem><para>Whether this connection is blocked due
+ to the memory high watermark having been exceeded. One
+ of <command>true</command>, <command>false</command>
+ or <command>if_publish</command>. Connections will be
+ blocked if the watermark is exceeded and they attempt
+ to publish a message.</para></listitem>
+ </varlistentry>
+
<varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
@@ -1127,8 +1146,9 @@
</varlistentry>
</variablelist>
<para>
- If no <command>connectioninfoitem</command>s are specified then user, peer
- address, peer port and connection state are displayed.
+ If no <command>connectioninfoitem</command>s are
+ specified then user, peer address, peer port, time since
+ flow control and memory block state are displayed.
</para>
<para role="example-prefix">
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 20486af5..9765e619 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -307,7 +307,8 @@ action(list_bindings, Node, Args, Opts, Inform) ->
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
+ ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port,
+ time_since_flow_ctl, mem_blocked]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 57aa880b..f3c71b9b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -40,10 +40,11 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, blockers}).
+ auth_mechanism, auth_state, blockers, last_flow_ctl_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
+ send_pend, state, mem_blocked, time_since_flow_ctl,
+ channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
@@ -221,7 +222,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf_len = 0,
auth_mechanism = none,
auth_state = none,
- blockers = sets:new()},
+ blockers = sets:new(),
+ last_flow_ctl_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -527,7 +529,9 @@ handle_frame(Type, Channel, Payload,
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
State1 = case rabbit_flow:blocked() of
- true -> update_blockers(true, self(), State);
+ true -> S = State#v1{last_flow_ctl_at =
+ erlang:now()},
+ update_blockers(true, self(), S);
false -> State
end,
post_process_frame(AnalyzedFrame, ChPid, State1);
@@ -859,8 +863,21 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_pend ->
socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
fun ([{_, I}]) -> I end);
+i(state, State) when ?IS_RUNNING(State)->
+ running;
i(state, #v1{connection_state = S}) ->
S;
+i(mem_blocked, #v1{connection_state = S,
+ blockers = Blockers}) ->
+ case {sets:is_element(mem, Blockers), S} of
+ {true, blocking} -> if_publish;
+ {true, blocked} -> true;
+ {false, _} -> false
+ end;
+i(time_since_flow_ctl, #v1{last_flow_ctl_at = never}) ->
+ never;
+i(time_since_flow_ctl, #v1{last_flow_ctl_at = T1}) ->
+ timer:now_diff(erlang:now(), T1) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->