summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_reader.erl60
1 files changed, 34 insertions, 26 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a21a061b..154fa394 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,14 +37,15 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- conserve_resources, last_blocked_by, last_blocked_at}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
+
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
channels]).
@@ -233,9 +234,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never},
+ throttle = #throttle{
+ conserve_resources = false,
+ last_blocked_by = none,
+ last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -291,8 +293,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb,
+ State = #v1{throttle = Throttle}) ->
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -375,29 +379,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case {CS, (Throttle#throttle.conserve_resources orelse
+ credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
+ Throttle#throttle{last_blocked_by = resource};
+update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
+ Throttle#throttle{last_blocked_by = flow}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -794,10 +800,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -805,7 +812,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
- conserve_resources = Conserve}),
+ throttle = Throttle#throttle{
+ conserve_resources = Conserve}}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -923,10 +931,10 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).