summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2021-07-15 19:20:47 +0200
committerDavid Ansari <david.ansari@gmx.de>2021-07-15 19:29:24 +0200
commit3964da37b446c4d430f8254fe801694c3e1d9304 (patch)
treec8e1714f60efdf1dc55583fb3aa34a930635bb8a
parent8a339aae6c2761c9c906365783ddf504397063bb (diff)
downloadrabbitmq-server-git-3964da37b446c4d430f8254fe801694c3e1d9304.tar.gz
Close TCP connection when stream reader times out
Add state timeouts. If the client takes more than 10s for a single step in the authentication protocol, make the server close the TCP connection. Also close the TCP connection if the server times out in state close_sent. That's the case when the client sends an invalid command (after successful authentication), the server requests the client to close the connection, but the client doesn't respond anymore.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl62
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl43
2 files changed, 103 insertions, 2 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 5de3ce6c8a..c17a307b61 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -146,6 +146,12 @@
peer_cert_subject,
peer_cert_validity]).
+-ifdef(TEST).
+-define(STATE_TIMEOUT, 500).
+-else.
+-define(STATE_TIMEOUT, 10_000).
+-endif.
+
%% client API
-export([start_link/4,
info/2,
@@ -170,7 +176,7 @@
close_sent/3]).
callback_mode() ->
- state_functions.
+ [state_functions, state_enter].
terminate(Reason, State, _StatemData) ->
rabbit_log:debug("~p terminating in state '~s' with reason '~p'", [?MODULE, State, Reason]).
@@ -252,6 +258,13 @@ init([KeepaliveSup,
[Error, Reason])
end.
+tcp_connected(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}};
+tcp_connected(state_timeout, close, #statem_data{
+ transport = Transport,
+ connection = #stream_connection{socket = Socket}
+ }) ->
+ state_timeout(?FUNCTION_NAME, Transport, Socket);
tcp_connected(info, Msg, StateData) ->
handle_info(Msg, StateData,
fun (NextConnectionStep,
@@ -271,6 +284,13 @@ tcp_connected(info, Msg, StateData) ->
end
end).
+peer_properties_exchanged(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}};
+peer_properties_exchanged(state_timeout, close, #statem_data{
+ transport = Transport,
+ connection = #stream_connection{socket = Socket}
+ }) ->
+ state_timeout(?FUNCTION_NAME, Transport, Socket);
peer_properties_exchanged(info, Msg, StateData) ->
handle_info(Msg, StateData,
fun (NextConnectionStep,
@@ -290,6 +310,13 @@ peer_properties_exchanged(info, Msg, StateData) ->
end
end).
+authenticating(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}};
+authenticating(state_timeout, close, #statem_data{
+ transport = Transport,
+ connection = #stream_connection{socket = Socket}
+ }) ->
+ state_timeout(?FUNCTION_NAME, Transport, Socket);
authenticating(info, Msg, StateData) ->
handle_info(Msg, StateData,
fun(NextConnectionStep,
@@ -316,6 +343,13 @@ authenticating(info, Msg, StateData) ->
end
end).
+tuning(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}};
+tuning(state_timeout, close, #statem_data{
+ transport = Transport,
+ connection = #stream_connection{socket = Socket}
+ }) ->
+ state_timeout(?FUNCTION_NAME, Transport, Socket);
tuning(info, Msg, StateData) ->
handle_info(Msg, StateData,
fun (NextConnectionStep,
@@ -339,6 +373,13 @@ tuning(info, Msg, StateData) ->
end
end).
+tuned(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}};
+tuned(state_timeout, close, #statem_data{
+ transport = Transport,
+ connection = #stream_connection{socket = Socket}
+ }) ->
+ state_timeout(?FUNCTION_NAME, Transport, Socket);
tuned(info, Msg, StateData) ->
handle_info(Msg, StateData,
fun (NextConnectionStep,
@@ -354,6 +395,11 @@ tuned(info, Msg, StateData) ->
end
end).
+state_timeout(State, Transport, Socket) ->
+ rabbit_log_connection:warning("Closing connection because of timeout in state '~s'.", [State]),
+ close_immediately(Transport, Socket),
+ stop.
+
handle_info(Msg, #statem_data{
transport = Transport,
connection = #stream_connection{socket = S, connection_step = PreviousConnectionStep} = Connection,
@@ -529,6 +575,8 @@ close_immediately(Transport, S) ->
Transport:shutdown(S, read),
Transport:close(S).
+open(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData};
open(info,
{resource_alarm, IsThereAlarm},
#statem_data{
@@ -949,6 +997,18 @@ open(cast, {force_event_refresh, Ref}, #statem_data{
Connection2 = ensure_stats_timer(Connection1),
{keep_state, StatemData#statem_data{connection = Connection2}}.
+close_sent(enter, _OldState, StateData) ->
+ {next_state, ?FUNCTION_NAME, StateData, {state_timeout, ?STATE_TIMEOUT, close}};
+close_sent(state_timeout, close, #statem_data{
+ transport = Transport,
+ connection = #stream_connection{socket = Socket} = Connection,
+ connection_state = State
+ }) ->
+ rabbit_log_connection:warning("Closing connection because of timeout in state '~s'.", [?FUNCTION_NAME]),
+ close(Transport, Socket, State),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
+ stop;
close_sent(info, {tcp, S, Data}, #statem_data{
transport = Transport,
connection = Connection,
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 7d1c6b576e..4488b1e59c 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -39,8 +39,12 @@ groups() ->
test_gc_consumers,
test_gc_publishers,
unauthenticated_client_rejected_tcp_connected,
+ timeout_tcp_connected,
unauthenticated_client_rejected_peer_properties_exchanged,
- unauthenticated_client_rejected_authenticating]},
+ timeout_peer_properties_exchanged,
+ unauthenticated_client_rejected_authenticating,
+ timeout_authenticating,
+ timeout_close_sent]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
{single_node, [], [test_global_counters]},
@@ -189,6 +193,11 @@ unauthenticated_client_rejected_tcp_connected(Config) ->
?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
+timeout_tcp_connected(Config) ->
+ Port = get_stream_port(Config),
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
+ ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
+
unauthenticated_client_rejected_peer_properties_exchanged(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
@@ -197,6 +206,13 @@ unauthenticated_client_rejected_peer_properties_exchanged(Config) ->
?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
+timeout_peer_properties_exchanged(Config) ->
+ Port = get_stream_port(Config),
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
+ C0 = rabbit_stream_core:init(0),
+ test_peer_properties(gen_tcp, S, C0),
+ ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
+
unauthenticated_client_rejected_authenticating(Config) ->
Port = get_stream_port(Config),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
@@ -206,6 +222,31 @@ unauthenticated_client_rejected_authenticating(Config) ->
?assertEqual(ok, gen_tcp:send(S, SaslHandshakeFrame)),
?awaitMatch({error, closed}, gen_tcp:send(S, <<"invalid data">>), ?WAIT).
+timeout_authenticating(Config) ->
+ Port = get_stream_port(Config),
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
+ C0 = rabbit_stream_core:init(0),
+ test_peer_properties(gen_tcp, S, C0),
+ SaslHandshakeFrame = rabbit_stream_core:frame({request, 1, sasl_handshake}),
+ ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
+
+timeout_close_sent(Config) ->
+ Port = get_stream_port(Config),
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
+ C0 = rabbit_stream_core:init(0),
+ C1 = test_peer_properties(gen_tcp, S, C0),
+ C2 = test_authenticate(gen_tcp, S, C1),
+ % Trigger rabbit_stream_reader to transition to state close_sent
+ NonExistentCommand = 999,
+ IOData = <<?REQUEST:1, NonExistentCommand:15, ?VERSION_1:16>>,
+ Size = iolist_size(IOData),
+ Frame = [<<Size:32>> | IOData],
+ ok = gen_tcp:send(S, Frame),
+ {{request, _CorrelationID, {close, ?RESPONSE_CODE_UNKNOWN_FRAME, <<"unknown frame">>}}, _Config}
+ = receive_commands(gen_tcp, S, C2),
+ % Now, rabbit_stream_reader is in state close_sent.
+ ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
+
consumer_count(Config) ->
ets_count(Config, ?TABLE_CONSUMER).