summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-13 13:16:46 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-13 13:16:46 +0000
commita97dac21a8ecd7fa57a001f90a6986b9cd2e5958 (patch)
tree7c5d8eedc47ff84ec504e4b5b7afeefc49c4bea7
parent3be7bd14197d914c14c31cae3401bb2175868361 (diff)
downloadrabbitmq-server-a97dac21a8ecd7fa57a001f90a6986b9cd2e5958.tar.gz
simplified running<->blocking/blocked state transitions
also more sensible info items
-rw-r--r--docs/rabbitmqctl.1.xml29
-rw-r--r--src/rabbit_control.erl3
-rw-r--r--src/rabbit_reader.erl103
3 files changed, 55 insertions, 80 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c243a3ee..4100864e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1069,27 +1069,24 @@
</varlistentry>
<varlistentry>
- <term>time_since_flow_ctl</term>
- <listitem><para>Time since RabbitMQ last imposed flow
- control on this connection as it was attempting to
- publish too fast for the server to keep
- up. Connections which are flow controlled can still
- publish but at a reduced rate.</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>mem_blocked</term>
- <listitem><para>Whether this connection is blocked due
- to the memory high watermark having been exceeded. One
- of <command>true</command>, <command>false</command>
- or <command>if_publish</command>. Connections will be
- blocked if the watermark is exceeded and they attempt
- to publish a message.</para></listitem>
+ <term>last_blocked_by</term>
+ <listitem><para>The reason for which this connection
+ was last blocked. One of 'mem' - due to a memory
+ alarm, 'flow' - due to internal flow control, or
+ 'none' if the connection was never
+ blocked.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>last_blocked_age</term>
+ <listitem><para>Time, in seconds, since this
+ connection was last blocked, or
+ 'infinity'.</para></listitem>
</varlistentry>
<varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 9765e619..20486af5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -307,8 +307,7 @@ action(list_bindings, Node, Args, Opts, Inform) ->
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port,
- time_since_flow_ctl, mem_blocked]),
+ ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 07cf688f..cf6bf6fa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -40,10 +40,10 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, blockers, last_flow_ctl_at}).
+ auth_mechanism, auth_state, conserve_memory, last_blocked}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, mem_blocked, time_since_flow_ctl,
+ send_pend, state, last_blocked_by, last_blocked_age,
channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
@@ -222,8 +222,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf_len = 0,
auth_mechanism = none,
auth_state = none,
- blockers = sets:new(),
- last_flow_ctl_at = never},
+ conserve_memory = false,
+ last_blocked = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -280,7 +280,7 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end.
handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, update_blockers(Conserve, mem, State));
+ recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -346,7 +346,7 @@ handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other({bump_credit, Msg}, Deb, State) ->
rabbit_flow:handle_bump_msg(Msg),
- recvloop(Deb, update_blockers(false, self(), State));
+ recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -361,34 +361,28 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-update_blockers(true, Blocker, State = #v1{connection_state = running,
- blockers = Blockers}) ->
- 0 = sets:size(Blockers), %% ASSERT
- State#v1{connection_state = blocking,
- blockers = sets:add_element(Blocker, Blockers)};
-update_blockers(true, Blocker, State = #v1{blockers = Blockers}) ->
- State#v1{blockers = sets:add_element(Blocker, Blockers)};
-update_blockers(false, Blocker, State = #v1{connection_state = blocking}) ->
- remove_blocker(Blocker, State);
-update_blockers(false, Blocker, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- State1 = remove_blocker(Blocker, State),
- case State1#v1.connection_state of
- running -> ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State1;
- _ -> State1
- end;
-update_blockers(_Conserve, _Blocker, State) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_memory = Mem}) ->
+ State#v1{connection_state =
+ case {CS, Mem orelse rabbit_flow:blocked()} of
+ {running, true} -> blocking;
+ {blocking, false} -> running;
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ running;
+ {_, _} -> CS
+ end}.
+
+maybe_block(State = #v1{connection_state = blocking}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ State#v1{connection_state = blocked,
+ last_blocked = {case State#v1.conserve_memory of
+ true -> mem;
+ false -> flow
+ end, erlang:now()}};
+maybe_block(State) ->
State.
-remove_blocker(Blocker, State = #v1{blockers = Blockers}) ->
- NewBlockers = sets:del_element(Blocker, Blockers),
- case sets:size(NewBlockers) of
- 0 -> State#v1{connection_state = running,
- blockers = NewBlockers};
- _ -> State#v1{blockers = NewBlockers}
- end.
-
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -514,13 +508,8 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame, self(),
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
- State1 = case rabbit_flow:blocked() of
- true -> S = State#v1{last_flow_ctl_at =
- erlang:now()},
- update_blockers(true, self(), S);
- false -> State
- end,
- post_process_frame(AnalyzedFrame, ChPid, State1);
+ post_process_frame(AnalyzedFrame, ChPid,
+ control_throttle(State));
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -540,12 +529,7 @@ post_process_frame({method, MethodName, _}, _ChPid,
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- case State#v1.connection_state of
- blocking -> ok = rabbit_heartbeat:pause_monitor(
- State#v1.heartbeater),
- State#v1{connection_state = blocked};
- _ -> State
- end;
+ maybe_block(State);
false -> State
end;
post_process_frame(_Frame, _ChPid, State) ->
@@ -717,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = update_blockers(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- mem,
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = control_throttle(
State#v1{connection_state = running,
- connection = NewConnection}),
+ connection = NewConnection,
+ conserve_memory = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -851,21 +835,16 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_pend ->
socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
fun ([{_, I}]) -> I end);
-i(state, State) when ?IS_RUNNING(State)->
- running;
i(state, #v1{connection_state = S}) ->
S;
-i(mem_blocked, #v1{connection_state = S,
- blockers = Blockers}) ->
- case {sets:is_element(mem, Blockers), S} of
- {true, blocking} -> if_publish;
- {true, blocked} -> true;
- {false, _} -> false
- end;
-i(time_since_flow_ctl, #v1{last_flow_ctl_at = never}) ->
- never;
-i(time_since_flow_ctl, #v1{last_flow_ctl_at = T1}) ->
- timer:now_diff(erlang:now(), T1) / 1000000;
+i(last_blocked_by, #v1{last_blocked = never}) ->
+ none;
+i(last_blocked_by, #v1{last_blocked = {By, _}}) ->
+ By;
+i(last_blocked_age, #v1{last_blocked = never}) ->
+ infinity;
+i(last_blocked_age, #v1{last_blocked = {_, T}}) ->
+ timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->