summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda-D <ayanda.dube@erlang-solutions.com>2020-11-26 17:02:43 +0000
committerAyanda-D <ayanda.dube@erlang-solutions.com>2020-11-26 17:02:49 +0000
commit5c469ed519a46f7e14efcd2aeaab9f80cd871113 (patch)
treeda5a2572434580cfd0c56e097b63745b59cc7692
parent482309718b22e34fbc282b3132712d6adeeb51ab (diff)
downloadrabbitmq-server-git-5c469ed519a46f7e14efcd2aeaab9f80cd871113.tar.gz
Ensure safe amqp client call timeouts on connection
establishment. This guarantees that the effective call timeouts are always safe, i.e. granting enough time to the underlying network operations, which must always timeout first in case of any unexpected lingering operations leading to timeouts. This eliminates the chance of leaking connection processes when call timeouts elapse, while underlying remote cconnection establishment call was still taking place.
-rw-r--r--deps/amqp_client/include/amqp_client_internal.hrl3
-rw-r--r--deps/amqp_client/src/amqp_connection.erl28
-rw-r--r--deps/amqp_client/src/amqp_util.erl10
3 files changed, 40 insertions, 1 deletions
diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl
index 01e099097e..075f5f4a12 100644
--- a/deps/amqp_client/include/amqp_client_internal.hrl
+++ b/deps/amqp_client/include/amqp_client_internal.hrl
@@ -28,3 +28,6 @@
{<<"authentication_failure_close">>, bool, true}]).
-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}).
+
+-define(DIRECT_OPERATION_TIMEOUT, 120000).
+-define(CALL_TIMEOUT_DEVIATION, 10000).
diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl
index 6800a44a3e..7aecda9108 100644
--- a/deps/amqp_client/src/amqp_connection.erl
+++ b/deps/amqp_client/src/amqp_connection.erl
@@ -170,6 +170,7 @@ start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) ->
end,
AmqpParams2 = set_connection_name(ConnName, AmqpParams1),
AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2),
+ ok = ensure_safe_call_timeout(AmqpParams3, amqp_util:call_timeout()),
{ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3),
amqp_gen_connection:connect(Connection).
@@ -393,3 +394,30 @@ connection_name(ConnectionPid) ->
{<<"connection_name">>, _, ConnName} -> ConnName;
false -> undefined
end.
+
+ensure_safe_call_timeout(#amqp_params_network{connection_timeout = ConnTimeout}, CallTimeout) ->
+ maybe_update_call_timeout(ConnTimeout, CallTimeout);
+ensure_safe_call_timeout(#amqp_params_direct{}, CallTimeout) ->
+ case net_kernel:get_net_ticktime() of
+ NetTicktime when is_integer(NetTicktime) ->
+ maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
+ CallTimeout);
+ {ongoing_change_to, NetTicktime} ->
+ maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
+ CallTimeout);
+ ignore ->
+ maybe_update_call_timeout(?DIRECT_OPERATION_TIMEOUT, CallTimeout)
+ end.
+
+maybe_update_call_timeout(BaseTimeout, CallTimeout)
+ when is_integer(BaseTimeout), CallTimeout > BaseTimeout ->
+ ok;
+maybe_update_call_timeout(BaseTimeout, CallTimeout) ->
+ EffectiveSafeCallTimeout = amqp_util:safe_call_timeout(BaseTimeout),
+ ?LOG_WARN("AMQP client call timeout was ~p millseconds, is updated to a safe effective "
+ "value of ~p millseconds", [CallTimeout, EffectiveSafeCallTimeout]),
+ amqp_util:update_call_timeout(EffectiveSafeCallTimeout),
+ ok.
+
+tick_or_direct_timeout(Timeout) when Timeout >= ?DIRECT_OPERATION_TIMEOUT -> Timeout;
+tick_or_direct_timeout(_Timeout) -> ?DIRECT_OPERATION_TIMEOUT.
diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl
index df7ce30662..0324d4a171 100644
--- a/deps/amqp_client/src/amqp_util.erl
+++ b/deps/amqp_client/src/amqp_util.erl
@@ -2,7 +2,7 @@
-include("amqp_client_internal.hrl").
--export([call_timeout/0]).
+-export([call_timeout/0, update_call_timeout/1, safe_call_timeout/1]).
call_timeout() ->
case get(gen_server_call_timeout) of
@@ -15,3 +15,11 @@ call_timeout() ->
Timeout ->
Timeout
end.
+
+update_call_timeout(Timeout) ->
+ application:set_env(amqp_client, gen_server_call_timeout, Timeout),
+ put(gen_server_call_timeout, Timeout),
+ ok.
+
+safe_call_timeout(Threshold) ->
+ Threshold + ?CALL_TIMEOUT_DEVIATION.