diff options
author | Michael Klishin <michael@rabbitmq.com> | 2013-12-11 15:47:52 +0400 |
---|---|---|
committer | Michael Klishin <michael@rabbitmq.com> | 2013-12-11 15:47:52 +0400 |
commit | dce68816c9f87f8a21d4c0fc85a2ed22593e3898 (patch) | |
tree | 294f7b9af5fbdc35c926fae06561f3e5283dd6fe | |
parent | 9d7ca484130d69d6755fda31f83b061974055e33 (diff) | |
download | rabbitmq-server-dce68816c9f87f8a21d4c0fc85a2ed22593e3898.tar.gz |
Track # of open channels with a counter in connection state
-rw-r--r-- | src/rabbit_reader.erl | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2b33293d..bd4b7862 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -43,7 +43,8 @@ -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, + channel_count}). -record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, blocked_sent}). @@ -604,28 +605,30 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) -> create_channel(Channel, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, - connection = #connection{name = Name, - protocol = Protocol, - frame_max = FrameMax, - channel_max = ChannelMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, - N = length(all_channels()), - case ChannelMax == 0 orelse N < ChannelMax of - true -> {ok, _ChSupPid, {ChPid, AState}} = + connection = Conn} = State, + #connection{name = Name, + protocol = Protocol, + frame_max = FrameMax, + channel_max = ChannelMax, + channel_count = ChannelCount, + user = User, + vhost = VHost, + capabilities = Capabilities} = Conn, + case ChannelMax == 0 orelse ChannelCount < ChannelMax of + true -> {ok, _ChSupPid, {ChPid, ChState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), + State1 = State#v1{connection = + Conn#connection{channel_count = (ChannelCount + 1)}}, put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ok, {ChPid, AState}}; + {ok, {ChPid, ChState}, State1}; false -> {error, rabbit_misc:amqp_error( not_allowed, "number of channels opened (~w) has " "reached the negotiated channel_max (~w)", - [N, ChannelMax], 'none')} + [ChannelCount, ChannelMax], 'none')} end. channel_cleanup(ChPid) -> @@ -674,27 +677,35 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - case (case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> {ok, Other} - end) of + ChRes = case get(ChKey) of + undefined -> + case create_channel(Channel, State) of + {ok, ChVal, ConnState} -> + put(ChKey, ChVal), + {ok, ChVal, ConnState}; + {error, E} -> + {error, E} + end; + Other -> {ok, Other, State} + end, + case ChRes of {error, Error} -> handle_exception(State, Channel, Error); - {ok, {ChPid, AState}} -> - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> + {ok, {ChPid, ChState}, State1} -> + case rabbit_command_assembler:process(Frame, ChState) of + {ok, NewChState} -> + put(ChKey, {ChPid, NewChState}), + post_process_frame(Frame, ChPid, State1); + {ok, Method, NewChState} -> rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> + put(ChKey, {ChPid, NewChState}), + post_process_frame(Frame, ChPid, State1); + {ok, Method, Content, NewChState} -> rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); + put(ChKey, {ChPid, NewChState}), + post_process_frame(Frame, ChPid, control_throttle(State1)); {error, Reason} -> - handle_exception(State, Channel, Reason) + handle_exception(State1, Channel, Reason) end end. @@ -874,9 +885,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ - frame_max = FrameMax, - channel_max = ChannelMax, - timeout_sec = ClientHeartbeat}, + frame_max = FrameMax, + channel_max = ChannelMax, + channel_count = 0, + timeout_sec = ClientHeartbeat}, queue_collector = Collector, heartbeater = Heartbeater}; @@ -1048,7 +1060,8 @@ i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> infinity; i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> timer:now_diff(erlang:now(), T) / 1000000; -i(channels, #v1{}) -> length(all_channels()); +i(channels, #v1{connection = Conn}) -> + Conn#connection.channel_count; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; |