diff options
author | David Ansari <david.ansari@gmx.de> | 2021-07-15 19:20:47 +0200 |
---|---|---|
committer | David Ansari <david.ansari@gmx.de> | 2021-07-15 19:29:24 +0200 |
commit | 3964da37b446c4d430f8254fe801694c3e1d9304 (patch) | |
tree | c8e1714f60efdf1dc55583fb3aa34a930635bb8a | |
parent | 8a339aae6c2761c9c906365783ddf504397063bb (diff) | |
download | rabbitmq-server-git-3964da37b446c4d430f8254fe801694c3e1d9304.tar.gz |
Close TCP connection when stream reader times out
Add state timeouts.
If the client takes more than 10s for a single step in the authentication
protocol, make the server close the TCP connection.
Also close the TCP connection if the server times out in state
close_sent. That's the case when the client sends an invalid command
(after successful authentication), the server requests the client to
close the connection, but the client doesn't respond anymore.
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 62 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl | 43 |
2 files changed, 103 insertions, 2 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 5de3ce6c8a..c17a307b61 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -146,6 +146,12 @@ peer_cert_subject, peer_cert_validity]). +-ifdef(TEST). +-define(STATE_TIMEOUT, 500). +-else. +-define(STATE_TIMEOUT, 10_000). +-endif. + %% client API -export([start_link/4, info/2, @@ -170,7 +176,7 @@ close_sent/3]). callback_mode() -> - state_functions. + [state_functions, state_enter]. terminate(Reason, State, _StatemData) -> rabbit_log:debug("~p terminating in state '~s' with reason '~p'", [?MODULE, State, Reason]). @@ -252,6 +258,13 @@ init([KeepaliveSup, [Error, Reason]) end. +tcp_connected(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}}; +tcp_connected(state_timeout, close, #statem_data{ + transport = Transport, + connection = #stream_connection{socket = Socket} + }) -> + state_timeout(?FUNCTION_NAME, Transport, Socket); tcp_connected(info, Msg, StateData) -> handle_info(Msg, StateData, fun (NextConnectionStep, @@ -271,6 +284,13 @@ tcp_connected(info, Msg, StateData) -> end end). +peer_properties_exchanged(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}}; +peer_properties_exchanged(state_timeout, close, #statem_data{ + transport = Transport, + connection = #stream_connection{socket = Socket} + }) -> + state_timeout(?FUNCTION_NAME, Transport, Socket); peer_properties_exchanged(info, Msg, StateData) -> handle_info(Msg, StateData, fun (NextConnectionStep, @@ -290,6 +310,13 @@ peer_properties_exchanged(info, Msg, StateData) -> end end). +authenticating(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}}; +authenticating(state_timeout, close, #statem_data{ + transport = Transport, + connection = #stream_connection{socket = Socket} + }) -> + state_timeout(?FUNCTION_NAME, Transport, Socket); authenticating(info, Msg, StateData) -> handle_info(Msg, StateData, fun(NextConnectionStep, @@ -316,6 +343,13 @@ authenticating(info, Msg, StateData) -> end end). +tuning(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}}; +tuning(state_timeout, close, #statem_data{ + transport = Transport, + connection = #stream_connection{socket = Socket} + }) -> + state_timeout(?FUNCTION_NAME, Transport, Socket); tuning(info, Msg, StateData) -> handle_info(Msg, StateData, fun (NextConnectionStep, @@ -339,6 +373,13 @@ tuning(info, Msg, StateData) -> end end). +tuned(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}}; +tuned(state_timeout, close, #statem_data{ + transport = Transport, + connection = #stream_connection{socket = Socket} + }) -> + state_timeout(?FUNCTION_NAME, Transport, Socket); tuned(info, Msg, StateData) -> handle_info(Msg, StateData, fun (NextConnectionStep, @@ -354,6 +395,11 @@ tuned(info, Msg, StateData) -> end end). +state_timeout(State, Transport, Socket) -> + rabbit_log_connection:warning("Closing connection because of timeout in state '~s'.", [State]), + close_immediately(Transport, Socket), + stop. + handle_info(Msg, #statem_data{ transport = Transport, connection = #stream_connection{socket = S, connection_step = PreviousConnectionStep} = Connection, @@ -529,6 +575,8 @@ close_immediately(Transport, S) -> Transport:shutdown(S, read), Transport:close(S). +open(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData}; open(info, {resource_alarm, IsThereAlarm}, #statem_data{ @@ -949,6 +997,18 @@ open(cast, {force_event_refresh, Ref}, #statem_data{ Connection2 = ensure_stats_timer(Connection1), {keep_state, StatemData#statem_data{connection = Connection2}}. +close_sent(enter, _OldState, StateData) -> + {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}}; +close_sent(state_timeout, close, #statem_data{ + transport = Transport, + connection = #stream_connection{socket = Socket} = Connection, + connection_state = State + }) -> + rabbit_log_connection:warning("Closing connection because of timeout in state '~s'.", [?FUNCTION_NAME]), + close(Transport, Socket, State), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection, State), + stop; close_sent(info, {tcp, S, Data}, #statem_data{ transport = Transport, connection = Connection, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 7d1c6b576e..4488b1e59c 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -39,8 +39,12 @@ groups() -> test_gc_consumers, test_gc_publishers, unauthenticated_client_rejected_tcp_connected, + timeout_tcp_connected, unauthenticated_client_rejected_peer_properties_exchanged, - unauthenticated_client_rejected_authenticating]}, + timeout_peer_properties_exchanged, + unauthenticated_client_rejected_authenticating, + timeout_authenticating, + timeout_close_sent]}, %% Run `test_global_counters` on its own so the global metrics are %% initialised to 0 for each testcase {single_node, [], [test_global_counters]}, @@ -189,6 +193,11 @@ unauthenticated_client_rejected_tcp_connected(Config) -> ?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)), ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). +timeout_tcp_connected(Config) -> + Port = get_stream_port(Config), + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), + ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). + unauthenticated_client_rejected_peer_properties_exchanged(Config) -> Port = get_stream_port(Config), {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), @@ -197,6 +206,13 @@ unauthenticated_client_rejected_peer_properties_exchanged(Config) -> ?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)), ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). +timeout_peer_properties_exchanged(Config) -> + Port = get_stream_port(Config), + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), + C0 = rabbit_stream_core:init(0), + test_peer_properties(gen_tcp, S, C0), + ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). + unauthenticated_client_rejected_authenticating(Config) -> Port = get_stream_port(Config), {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), @@ -206,6 +222,31 @@ unauthenticated_client_rejected_authenticating(Config) -> ?assertEqual(ok, gen_tcp:send(S, SaslHandshakeFrame)), ?awaitMatch({error, closed}, gen_tcp:send(S, <<"invalid data">>), ?WAIT). +timeout_authenticating(Config) -> + Port = get_stream_port(Config), + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), + C0 = rabbit_stream_core:init(0), + test_peer_properties(gen_tcp, S, C0), + SaslHandshakeFrame = rabbit_stream_core:frame({request, 1, sasl_handshake}), + ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). + +timeout_close_sent(Config) -> + Port = get_stream_port(Config), + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(gen_tcp, S, C0), + C2 = test_authenticate(gen_tcp, S, C1), + % Trigger rabbit_stream_reader to transition to state close_sent + NonExistentCommand = 999, + IOData = <<?REQUEST:1, NonExistentCommand:15, ?VERSION_1:16>>, + Size = iolist_size(IOData), + Frame = [<<Size:32>> | IOData], + ok = gen_tcp:send(S, Frame), + {{request, _CorrelationID, {close, ?RESPONSE_CODE_UNKNOWN_FRAME, <<"unknown frame">>}}, _Config} + = receive_commands(gen_tcp, S, C2), + % Now, rabbit_stream_reader is in state close_sent. + ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). + consumer_count(Config) -> ets_count(Config, ?TABLE_CONSUMER). |