summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2013-12-11 15:47:52 +0400
committerMichael Klishin <michael@rabbitmq.com>2013-12-11 15:47:52 +0400
commitdce68816c9f87f8a21d4c0fc85a2ed22593e3898 (patch)
tree294f7b9af5fbdc35c926fae06561f3e5283dd6fe
parent9d7ca484130d69d6755fda31f83b061974055e33 (diff)
downloadrabbitmq-server-dce68816c9f87f8a21d4c0fc85a2ed22593e3898.tar.gz
Track # of open channels with a counter in connection state
-rw-r--r--src/rabbit_reader.erl81
1 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2b33293d..bd4b7862 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -43,7 +43,8 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state,
+ channel_count}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
blocked_sent}).
@@ -604,28 +605,30 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{name = Name,
- protocol = Protocol,
- frame_max = FrameMax,
- channel_max = ChannelMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- N = length(all_channels()),
- case ChannelMax == 0 orelse N < ChannelMax of
- true -> {ok, _ChSupPid, {ChPid, AState}} =
+ connection = Conn} = State,
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ channel_count = ChannelCount,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities} = Conn,
+ case ChannelMax == 0 orelse ChannelCount < ChannelMax of
+ true -> {ok, _ChSupPid, {ChPid, ChState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities,
Collector}),
MRef = erlang:monitor(process, ChPid),
+ State1 = State#v1{connection =
+ Conn#connection{channel_count = (ChannelCount + 1)}},
put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ok, {ChPid, AState}};
+ {ok, {ChPid, ChState}, State1};
false -> {error, rabbit_misc:amqp_error(
not_allowed, "number of channels opened (~w) has "
"reached the negotiated channel_max (~w)",
- [N, ChannelMax], 'none')}
+ [ChannelCount, ChannelMax], 'none')}
end.
channel_cleanup(ChPid) ->
@@ -674,27 +677,35 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- case (case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> {ok, Other}
- end) of
+ ChRes = case get(ChKey) of
+ undefined ->
+ case create_channel(Channel, State) of
+ {ok, ChVal, ConnState} ->
+ put(ChKey, ChVal),
+ {ok, ChVal, ConnState};
+ {error, E} ->
+ {error, E}
+ end;
+ Other -> {ok, Other, State}
+ end,
+ case ChRes of
{error, Error} ->
handle_exception(State, Channel, Error);
- {ok, {ChPid, AState}} ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
+ {ok, {ChPid, ChState}, State1} ->
+ case rabbit_command_assembler:process(Frame, ChState) of
+ {ok, NewChState} ->
+ put(ChKey, {ChPid, NewChState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, NewChState} ->
rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
+ put(ChKey, {ChPid, NewChState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, Content, NewChState} ->
rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
+ put(ChKey, {ChPid, NewChState}),
+ post_process_frame(Frame, ChPid, control_throttle(State1));
{error, Reason} ->
- handle_exception(State, Channel, Reason)
+ handle_exception(State1, Channel, Reason)
end
end.
@@ -874,9 +885,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
- frame_max = FrameMax,
- channel_max = ChannelMax,
- timeout_sec = ClientHeartbeat},
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ channel_count = 0,
+ timeout_sec = ClientHeartbeat},
queue_collector = Collector,
heartbeater = Heartbeater};
@@ -1048,7 +1060,8 @@ i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
-i(channels, #v1{}) -> length(all_channels());
+i(channels, #v1{connection = Conn}) ->
+ Conn#connection.channel_count;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
ic(name, #connection{name = Name}) -> Name;