diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-13 13:16:46 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-13 13:16:46 +0000 |
commit | a97dac21a8ecd7fa57a001f90a6986b9cd2e5958 (patch) | |
tree | 7c5d8eedc47ff84ec504e4b5b7afeefc49c4bea7 | |
parent | 3be7bd14197d914c14c31cae3401bb2175868361 (diff) | |
download | rabbitmq-server-a97dac21a8ecd7fa57a001f90a6986b9cd2e5958.tar.gz |
simplified running<->blocking/blocked state transitions
also more sensible info items
-rw-r--r-- | docs/rabbitmqctl.1.xml | 29 | ||||
-rw-r--r-- | src/rabbit_control.erl | 3 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 103 |
3 files changed, 55 insertions, 80 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c243a3ee..4100864e 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1069,27 +1069,24 @@ </varlistentry> <varlistentry> - <term>time_since_flow_ctl</term> - <listitem><para>Time since RabbitMQ last imposed flow - control on this connection as it was attempting to - publish too fast for the server to keep - up. Connections which are flow controlled can still - publish but at a reduced rate.</para></listitem> - </varlistentry> - <varlistentry> - <term>mem_blocked</term> - <listitem><para>Whether this connection is blocked due - to the memory high watermark having been exceeded. One - of <command>true</command>, <command>false</command> - or <command>if_publish</command>. Connections will be - blocked if the watermark is exceeded and they attempt - to publish a message.</para></listitem> + <term>last_blocked_by</term> + <listitem><para>The reason for which this connection + was last blocked. One of 'mem' - due to a memory + alarm, 'flow' - due to internal flow control, or + 'none' if the connection was never + blocked.</para></listitem> + </varlistentry> + <varlistentry> + <term>last_blocked_age</term> + <listitem><para>Time, in seconds, since this + connection was last blocked, or + 'infinity'.</para></listitem> </varlistentry> <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 9765e619..20486af5 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -307,8 +307,7 @@ action(list_bindings, Node, Args, Opts, Inform) -> action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, - time_since_flow_ctl, mem_blocked]), + ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 07cf688f..cf6bf6fa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -40,10 +40,10 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, blockers, last_flow_ctl_at}). + auth_mechanism, auth_state, conserve_memory, last_blocked}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, mem_blocked, time_since_flow_ctl, + send_pend, state, last_blocked_by, last_blocked_age, channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, @@ -222,8 +222,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf_len = 0, auth_mechanism = none, auth_state = none, - blockers = sets:new(), - last_flow_ctl_at = never}, + conserve_memory = false, + last_blocked = never}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -280,7 +280,7 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end. handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, update_blockers(Conserve, mem, State)); + recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -346,7 +346,7 @@ handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other({bump_credit, Msg}, Deb, State) -> rabbit_flow:handle_bump_msg(Msg), - recvloop(Deb, update_blockers(false, self(), State)); + recvloop(Deb, control_throttle(State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -361,34 +361,28 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -update_blockers(true, Blocker, State = #v1{connection_state = running, - blockers = Blockers}) -> - 0 = sets:size(Blockers), %% ASSERT - State#v1{connection_state = blocking, - blockers = sets:add_element(Blocker, Blockers)}; -update_blockers(true, Blocker, State = #v1{blockers = Blockers}) -> - State#v1{blockers = sets:add_element(Blocker, Blockers)}; -update_blockers(false, Blocker, State = #v1{connection_state = blocking}) -> - remove_blocker(Blocker, State); -update_blockers(false, Blocker, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - State1 = remove_blocker(Blocker, State), - case State1#v1.connection_state of - running -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State1; - _ -> State1 - end; -update_blockers(_Conserve, _Blocker, State) -> +control_throttle(State = #v1{connection_state = CS, + conserve_memory = Mem}) -> + State#v1{connection_state = + case {CS, Mem orelse rabbit_flow:blocked()} of + {running, true} -> blocking; + {blocking, false} -> running; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#v1.heartbeater), + running; + {_, _} -> CS + end}. + +maybe_block(State = #v1{connection_state = blocking}) -> + ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + State#v1{connection_state = blocked, + last_blocked = {case State#v1.conserve_memory of + true -> mem; + false -> flow + end, erlang:now()}}; +maybe_block(State) -> State. -remove_blocker(Blocker, State = #v1{blockers = Blockers}) -> - NewBlockers = sets:del_element(Blocker, Blockers), - case sets:size(NewBlockers) of - 0 -> State#v1{connection_state = running, - blockers = NewBlockers}; - _ -> State#v1{blockers = NewBlockers} - end. - close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -514,13 +508,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame, self(), Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), - State1 = case rabbit_flow:blocked() of - true -> S = State#v1{last_flow_ctl_at = - erlang:now()}, - update_blockers(true, self(), S); - false -> State - end, - post_process_frame(AnalyzedFrame, ChPid, State1); + post_process_frame(AnalyzedFrame, ChPid, + control_throttle(State)); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -540,12 +529,7 @@ post_process_frame({method, MethodName, _}, _ChPid, protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - case State#v1.connection_state of - blocking -> ok = rabbit_heartbeat:pause_monitor( - State#v1.heartbeater), - State#v1{connection_state = blocked}; - _ -> State - end; + maybe_block(State); false -> State end; post_process_frame(_Frame, _ChPid, State) -> @@ -717,11 +701,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - State1 = update_blockers( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - mem, + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State1 = control_throttle( State#v1{connection_state = running, - connection = NewConnection}), + connection = NewConnection, + conserve_memory = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -851,21 +835,16 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= send_pend -> socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end, fun ([{_, I}]) -> I end); -i(state, State) when ?IS_RUNNING(State)-> - running; i(state, #v1{connection_state = S}) -> S; -i(mem_blocked, #v1{connection_state = S, - blockers = Blockers}) -> - case {sets:is_element(mem, Blockers), S} of - {true, blocking} -> if_publish; - {true, blocked} -> true; - {false, _} -> false - end; -i(time_since_flow_ctl, #v1{last_flow_ctl_at = never}) -> - never; -i(time_since_flow_ctl, #v1{last_flow_ctl_at = T1}) -> - timer:now_diff(erlang:now(), T1) / 1000000; +i(last_blocked_by, #v1{last_blocked = never}) -> + none; +i(last_blocked_by, #v1{last_blocked = {By, _}}) -> + By; +i(last_blocked_age, #v1{last_blocked = never}) -> + infinity; +i(last_blocked_age, #v1{last_blocked = {_, T}}) -> + timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); i(protocol, #v1{connection = #connection{protocol = none}}) -> |