summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-12-01 06:16:03 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-12-01 06:16:03 +0300
commite6c97547e498d4b5361a626dde92872c582ae107 (patch)
tree4d3c7d124664f46d09a4e744d018887cef7ca8be
parent7559504799dcc6efd49e8a1506c0bc9df3de8928 (diff)
parenta5c8de73ce77582023957657d2d66126a76cd514 (diff)
downloadrabbitmq-server-git-e6c97547e498d4b5361a626dde92872c582ae107.tar.gz
Merge branch 'Ayanda-D-call-timeout-safety' into master
-rw-r--r--deps/amqp_client/Makefile2
-rw-r--r--deps/amqp_client/include/amqp_client_internal.hrl3
-rw-r--r--deps/amqp_client/src/amqp_channel_sup.erl13
-rw-r--r--deps/amqp_client/src/amqp_connection.erl28
-rw-r--r--deps/amqp_client/src/amqp_direct_connection.erl6
-rw-r--r--deps/amqp_client/src/amqp_gen_connection.erl4
-rw-r--r--deps/amqp_client/src/amqp_util.erl10
-rw-r--r--deps/amqp_client/test/system_SUITE.erl120
8 files changed, 175 insertions, 11 deletions
diff --git a/deps/amqp_client/Makefile b/deps/amqp_client/Makefile
index b15fd918f0..186441ce9a 100644
--- a/deps/amqp_client/Makefile
+++ b/deps/amqp_client/Makefile
@@ -30,7 +30,7 @@ PACKAGES_DIR ?= $(abspath PACKAGES)
LOCAL_DEPS = xmerl
DEPS = rabbit_common
-TEST_DEPS = rabbitmq_ct_helpers rabbit
+TEST_DEPS = rabbitmq_ct_helpers rabbit meck
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-test.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk \
diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl
index 01e099097e..075f5f4a12 100644
--- a/deps/amqp_client/include/amqp_client_internal.hrl
+++ b/deps/amqp_client/include/amqp_client_internal.hrl
@@ -28,3 +28,6 @@
{<<"authentication_failure_close">>, bool, true}]).
-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}).
+
+-define(DIRECT_OPERATION_TIMEOUT, 120000).
+-define(CALL_TIMEOUT_DEVIATION, 10000).
diff --git a/deps/amqp_client/src/amqp_channel_sup.erl b/deps/amqp_client/src/amqp_channel_sup.erl
index 9bd85ce946..bc346c9584 100644
--- a/deps/amqp_client/src/amqp_channel_sup.erl
+++ b/deps/amqp_client/src/amqp_channel_sup.erl
@@ -47,9 +47,16 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber,
start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
ConnName, ChNumber, ChPid) ->
- rpc:call(Node, rabbit_direct, start_channel,
- [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
- VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams]);
+ case rpc:call(Node, rabbit_direct, start_channel,
+ [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
+ VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of
+ {ok, _Writer} = Reply ->
+ Reply;
+ {badrpc, Reason} ->
+ {error, {Reason, Node}};
+ Error ->
+ Error
+ end;
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
supervisor2:start_child(
diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl
index 6800a44a3e..d69355453d 100644
--- a/deps/amqp_client/src/amqp_connection.erl
+++ b/deps/amqp_client/src/amqp_connection.erl
@@ -170,6 +170,7 @@ start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) ->
end,
AmqpParams2 = set_connection_name(ConnName, AmqpParams1),
AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2),
+ ok = ensure_safe_call_timeout(AmqpParams3, amqp_util:call_timeout()),
{ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3),
amqp_gen_connection:connect(Connection).
@@ -393,3 +394,30 @@ connection_name(ConnectionPid) ->
{<<"connection_name">>, _, ConnName} -> ConnName;
false -> undefined
end.
+
+ensure_safe_call_timeout(#amqp_params_network{connection_timeout = ConnTimeout}, CallTimeout) ->
+ maybe_update_call_timeout(ConnTimeout, CallTimeout);
+ensure_safe_call_timeout(#amqp_params_direct{}, CallTimeout) ->
+ case net_kernel:get_net_ticktime() of
+ NetTicktime when is_integer(NetTicktime) ->
+ maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
+ CallTimeout);
+ {ongoing_change_to, NetTicktime} ->
+ maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
+ CallTimeout);
+ ignore ->
+ maybe_update_call_timeout(?DIRECT_OPERATION_TIMEOUT, CallTimeout)
+ end.
+
+maybe_update_call_timeout(BaseTimeout, CallTimeout)
+ when is_integer(BaseTimeout), CallTimeout > BaseTimeout ->
+ ok;
+maybe_update_call_timeout(BaseTimeout, CallTimeout) ->
+ EffectiveSafeCallTimeout = amqp_util:safe_call_timeout(BaseTimeout),
+ ?LOG_WARN("AMQP 0-9-1 client call timeout was ~p ms, is updated to a safe effective "
+ "value of ~p ms", [CallTimeout, EffectiveSafeCallTimeout]),
+ amqp_util:update_call_timeout(EffectiveSafeCallTimeout),
+ ok.
+
+tick_or_direct_timeout(Timeout) when Timeout >= ?DIRECT_OPERATION_TIMEOUT -> Timeout;
+tick_or_direct_timeout(_Timeout) -> ?DIRECT_OPERATION_TIMEOUT.
diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl
index a07c67074e..ea486aacd1 100644
--- a/deps/amqp_client/src/amqp_direct_connection.erl
+++ b/deps/amqp_client/src/amqp_direct_connection.erl
@@ -143,7 +143,7 @@ connect(Params = #amqp_params_direct{username = Username,
DecryptedPassword = credentials_obfuscation:decrypt(Password),
case rpc:call(Node, rabbit_direct, connect,
[{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(),
- connection_info(State1)]) of
+ connection_info(State1)], ?DIRECT_OPERATION_TIMEOUT) of
{ok, {User, ServerProperties}} ->
{ok, ChMgr, Collector} = SIF(i(name, State1)),
State2 = State1#state{user = User,
@@ -158,8 +158,8 @@ connect(Params = #amqp_params_direct{username = Username,
{ok, {ServerProperties, 0, ChMgr, State2}};
{error, _} = E ->
E;
- {badrpc, nodedown} ->
- {error, {nodedown, Node}}
+ {badrpc, Reason} ->
+ {error, {Reason, Node}}
end.
ensure_adapter_info(none) ->
diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl
index 5c826a5b5f..90a262ae9b 100644
--- a/deps/amqp_client/src/amqp_gen_connection.erl
+++ b/deps/amqp_client/src/amqp_gen_connection.erl
@@ -49,12 +49,14 @@ connect(Pid) ->
gen_server:call(Pid, connect, amqp_util:call_timeout()).
open_channel(Pid, ProposedNumber, Consumer) ->
- case gen_server:call(Pid,
+ try gen_server:call(Pid,
{command, {open_channel, ProposedNumber, Consumer}},
amqp_util:call_timeout()) of
{ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
{ok, ChannelPid};
Error -> Error
+ catch
+ _:Reason -> {error, Reason}
end.
hard_error_in_channel(Pid, ChannelPid, Reason) ->
diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl
index df7ce30662..0324d4a171 100644
--- a/deps/amqp_client/src/amqp_util.erl
+++ b/deps/amqp_client/src/amqp_util.erl
@@ -2,7 +2,7 @@
-include("amqp_client_internal.hrl").
--export([call_timeout/0]).
+-export([call_timeout/0, update_call_timeout/1, safe_call_timeout/1]).
call_timeout() ->
case get(gen_server_call_timeout) of
@@ -15,3 +15,11 @@ call_timeout() ->
Timeout ->
Timeout
end.
+
+update_call_timeout(Timeout) ->
+ application:set_env(amqp_client, gen_server_call_timeout, Timeout),
+ put(gen_server_call_timeout, Timeout),
+ ok.
+
+safe_call_timeout(Threshold) ->
+ Threshold + ?CALL_TIMEOUT_DEVIATION.
diff --git a/deps/amqp_client/test/system_SUITE.erl b/deps/amqp_client/test/system_SUITE.erl
index 9e39e468b7..45ada5cf19 100644
--- a/deps/amqp_client/test/system_SUITE.erl
+++ b/deps/amqp_client/test/system_SUITE.erl
@@ -77,9 +77,10 @@ all() ->
{hard_error_loop, [{repeat, 100}, parallel], [hard_error]}
]).
-define(COMMON_NON_PARALLEL_TEST_CASES, [
- basic_qos, %% Not parallel because it's time-based.
+ basic_qos, %% Not parallel because it's time-based, or has mocks
connection_failure,
- channel_death
+ channel_death,
+ safe_call_timeouts
]).
groups() ->
@@ -294,6 +295,111 @@ named_connection(Config) ->
%% -------------------------------------------------------------------
+safe_call_timeouts(Config) ->
+ Params = ?config(amqp_client_conn_params, Config),
+ safe_call_timeouts_test(Params).
+
+safe_call_timeouts_test(Params = #amqp_params_network{}) ->
+ TestConnTimeout = 2000,
+ TestCallTimeout = 1000,
+
+ Params1 = Params#amqp_params_network{connection_timeout = TestConnTimeout},
+
+ %% Normal connection
+ amqp_util:update_call_timeout(TestCallTimeout),
+
+ {ok, Connection1} = amqp_connection:start(Params1),
+ ?assertEqual(TestConnTimeout + ?CALL_TIMEOUT_DEVIATION, amqp_util:call_timeout()),
+
+ ?assertEqual(ok, amqp_connection:close(Connection1)),
+ wait_for_death(Connection1),
+
+ %% Failing connection
+ amqp_util:update_call_timeout(TestCallTimeout),
+
+ ok = meck:new(amqp_network_connection, [passthrough]),
+ ok = meck:expect(amqp_network_connection, connect,
+ fun(_AmqpParams, _SIF, _TypeSup, _State) ->
+ timer:sleep(TestConnTimeout),
+ {error, test_connection_timeout}
+ end),
+
+ ?assertEqual({error, test_connection_timeout}, amqp_connection:start(Params1)),
+
+ ?assertEqual(TestConnTimeout + ?CALL_TIMEOUT_DEVIATION, amqp_util:call_timeout()),
+
+ meck:unload(amqp_network_connection);
+
+safe_call_timeouts_test(Params = #amqp_params_direct{}) ->
+ TestCallTimeout = 30000,
+ NetTicktime0 = net_kernel:get_net_ticktime(),
+ amqp_util:update_call_timeout(TestCallTimeout),
+
+ %% 1. NetTicktime >= DIRECT_OPERATION_TIMEOUT (120s)
+ NetTicktime1 = 140,
+ net_kernel:set_net_ticktime(NetTicktime1, 1),
+ wait_until_net_ticktime(NetTicktime1),
+
+ {ok, Connection1} = amqp_connection:start(Params),
+ ?assertEqual((NetTicktime1 * 1000) + ?CALL_TIMEOUT_DEVIATION,
+ amqp_util:call_timeout()),
+
+ ?assertEqual(ok, amqp_connection:close(Connection1)),
+ wait_for_death(Connection1),
+
+ %% Reset call timeout
+ amqp_util:update_call_timeout(TestCallTimeout),
+
+ %% 2. Transitioning NetTicktime >= DIRECT_OPERATION_TIMEOUT (120s)
+ NetTicktime2 = 120,
+ net_kernel:set_net_ticktime(NetTicktime2, 1),
+ ?assertEqual({ongoing_change_to, NetTicktime2}, net_kernel:get_net_ticktime()),
+
+ {ok, Connection2} = amqp_connection:start(Params),
+ ?assertEqual((NetTicktime2 * 1000) + ?CALL_TIMEOUT_DEVIATION,
+ amqp_util:call_timeout()),
+
+ wait_until_net_ticktime(NetTicktime2),
+
+ ?assertEqual(ok, amqp_connection:close(Connection2)),
+ wait_for_death(Connection2),
+
+ %% Reset call timeout
+ amqp_util:update_call_timeout(TestCallTimeout),
+
+ %% 3. NetTicktime < DIRECT_OPERATION_TIMEOUT (120s)
+ NetTicktime3 = 60,
+ net_kernel:set_net_ticktime(NetTicktime3, 1),
+ wait_until_net_ticktime(NetTicktime3),
+
+ {ok, Connection3} = amqp_connection:start(Params),
+ ?assertEqual((?DIRECT_OPERATION_TIMEOUT + ?CALL_TIMEOUT_DEVIATION),
+ amqp_util:call_timeout()),
+
+ net_kernel:set_net_ticktime(NetTicktime0, 1),
+ wait_until_net_ticktime(NetTicktime0),
+ ?assertEqual(ok, amqp_connection:close(Connection3)),
+ wait_for_death(Connection3),
+
+ %% Failing direct connection
+ amqp_util:update_call_timeout(_LowCallTimeout = 1000),
+
+ ok = meck:new(amqp_direct_connection, [passthrough]),
+ ok = meck:expect(amqp_direct_connection, connect,
+ fun(_AmqpParams, _SIF, _TypeSup, _State) ->
+ timer:sleep(2000),
+ {error, test_connection_timeout}
+ end),
+
+ ?assertEqual({error, test_connection_timeout}, amqp_connection:start(Params)),
+
+ ?assertEqual((?DIRECT_OPERATION_TIMEOUT + ?CALL_TIMEOUT_DEVIATION),
+ amqp_util:call_timeout()),
+
+ meck:unload(amqp_direct_connection).
+
+%% -------------------------------------------------------------------
+
simultaneous_close(Config) ->
{ok, Connection} = new_connection(Config),
%% We pick a high channel number, to avoid any conflict with other
@@ -1456,6 +1562,16 @@ assert_down_with_error(MonitorRef, CodeAtom) ->
exit(did_not_die)
end.
+wait_until_net_ticktime(NetTicktime) ->
+ case net_kernel:get_net_ticktime() of
+ NetTicktime -> ok;
+ {ongoing_change_to, NetTicktime} ->
+ timer:sleep(1000),
+ wait_until_net_ticktime(NetTicktime);
+ _ ->
+ throw({error, {net_ticktime_not_set, NetTicktime}})
+ end.
+
set_resource_alarm(memory, Config) ->
SrcDir = ?config(amqp_client_srcdir, Config),
Nodename = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),