summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2023-03-17 12:48:26 +0000
committerDavid Ansari <david.ansari@gmx.de>2023-05-15 15:19:55 +0000
commite387493e613617fdf84088696bca8b4360860aee (patch)
tree47a7a1e686a8422bf014b26bb9f580b13a1dcb9d
parentd1fa6e83c3a127f56c7bb10c64e597757629b060 (diff)
downloadrabbitmq-server-git-e387493e613617fdf84088696bca8b4360860aee.tar.gz
Support Session Expiry Interval
Allow Session Expiry Interval to be changed when client DISCONNECTs. Deprecate config subscription_ttl in favour of max_session_expiry_interval_secs because the Session Expiry Interval will also apply to publishers that connect with a will message and will delay interval. "The additional session state of an MQTT v5 server includes: * The Will Message and the Will Delay Interval * If the Session is currently not connected, the time at which the Session will end and Session State will be discarded." The Session Expiry Interval picked by the server and sent to the client in the CONNACK is the minimum of max_session_expiry_interval_secs and the requested Session Expiry Interval by the client in CONNECT. This commit favours dynamically changing the queue argument x-expires over creating millions of different policies since that many policies will come with new scalability issues. Dynamically changing queue arguments is not allowed by AMQP 0.9.1 clients. However, it should be perfectly okay for the MQTT plugin to do so for the queues it manages. MQTT clients are not aware that these queues exist.
-rw-r--r--deps/rabbit/docs/rabbitmq.conf.example6
-rw-r--r--deps/rabbitmq_mqtt/BUILD.bazel5
-rw-r--r--deps/rabbitmq_mqtt/Makefile2
-rw-r--r--deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl3
-rw-r--r--deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema29
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_packet.erl4
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl225
-rw-r--r--deps/rabbitmq_mqtt/test/auth_SUITE.erl11
-rw-r--r--deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets14
-rw-r--r--deps/rabbitmq_mqtt/test/reader_SUITE.erl23
-rw-r--r--deps/rabbitmq_mqtt/test/shared_SUITE.erl42
-rw-r--r--deps/rabbitmq_mqtt/test/util.erl10
-rw-r--r--deps/rabbitmq_mqtt/test/v5_SUITE.erl156
13 files changed, 401 insertions, 129 deletions
diff --git a/deps/rabbit/docs/rabbitmq.conf.example b/deps/rabbit/docs/rabbitmq.conf.example
index 00dec1473c..ddf4e0d8ac 100644
--- a/deps/rabbit/docs/rabbitmq.conf.example
+++ b/deps/rabbit/docs/rabbitmq.conf.example
@@ -890,9 +890,11 @@
##
# mqtt.exchange = amq.topic
-## Specify TTL (time to live) to control the lifetime of non-clean sessions.
+## Define the maximum Session Expiry Interval in seconds allowed by the server.
+## 'infinity' means the session does not expire.
+## An MQTT 5.0 client can choose a lower value.
##
-# mqtt.subscription_ttl = 1800000
+# mqtt.max_session_expiry_interval_secs = 1800
## Set the prefetch count (governing the maximum number of unacknowledged
## messages that will be delivered).
diff --git a/deps/rabbitmq_mqtt/BUILD.bazel b/deps/rabbitmq_mqtt/BUILD.bazel
index e32c734179..628d84e8c0 100644
--- a/deps/rabbitmq_mqtt/BUILD.bazel
+++ b/deps/rabbitmq_mqtt/BUILD.bazel
@@ -34,7 +34,7 @@ APP_ENV = """[
{allow_anonymous, true},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
- {subscription_ttl, 86400000}, %% 24 hours
+ {max_session_expiry_interval_secs, 86400}, %% 1 day
{retained_message_store, rabbit_mqtt_retained_msg_store_dets},
%% only used by DETS store
{retained_message_store_dets_sync_interval, 2000},
@@ -256,7 +256,8 @@ rabbitmq_suite(
rabbitmq_integration_suite(
name = "v5_SUITE",
- shard_count = 4,
+ # shard_count = 4,
+ shard_count = 2,
runtime_deps = [
"@emqtt//:erlang_app",
"@gun//:erlang_app",
diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile
index 7b94b872b6..985b0e20c4 100644
--- a/deps/rabbitmq_mqtt/Makefile
+++ b/deps/rabbitmq_mqtt/Makefile
@@ -11,7 +11,7 @@ define PROJECT_ENV
{allow_anonymous, true},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
- {subscription_ttl, 86400000}, %% 24 hours
+ {max_session_expiry_interval_secs, 86400}, %% 1 day
{retained_message_store, rabbit_mqtt_retained_msg_store_dets},
%% only used by DETS store
{retained_message_store_dets_sync_interval, 2000},
diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl
index 5322b42908..be6e34e3ef 100644
--- a/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl
+++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl
@@ -157,8 +157,7 @@
will_retain :: boolean(),
will_qos :: qos(),
will_flag :: boolean(),
- %% corresponds to Clean Start in MQTT 5.0
- clean_sess :: boolean(),
+ clean_start :: boolean(),
keep_alive :: non_neg_integer(),
props :: properties(),
client_id :: binary(),
diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
index 684694cc9f..da2a64e7d5 100644
--- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
+++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
@@ -65,19 +65,34 @@ fun(Conf) ->
list_to_binary(cuttlefish:conf_get("mqtt.exchange", Conf))
end}.
-%% Specify TTL (time to live) to control the lifetime of non-clean sessions.
-%%
-%% {subscription_ttl, 1800000},
{mapping, "mqtt.subscription_ttl", "rabbitmq_mqtt.subscription_ttl", [
{datatype, [{enum, [undefined, infinity]}, integer]}
]}.
{translation, "rabbitmq_mqtt.subscription_ttl",
fun(Conf) ->
- case cuttlefish:conf_get("mqtt.subscription_ttl", Conf, undefined) of
- undefined -> undefined;
- infinity -> undefined;
- Ms -> Ms
+ cuttlefish:warn(
+ "Since 3.13 mqtt.subscription_ttl (in milliseconds) is deprecated and "
+ "has no effect anymore. Use mqtt.max_session_expiry_interval_secs (in "
+ "seconds) instead."),
+ cuttlefish:unset()
+end}.
+
+%% Defines the maximum Session Expiry Interval in seconds allowed by the server.
+%% 'infinity' means the session does not expire.
+%% An MQTT 5.0 client can choose a lower value.
+
+{mapping, "mqtt.max_session_expiry_interval_secs", "rabbitmq_mqtt.max_session_expiry_interval_secs", [
+ {datatype, [integer, {atom, infinity}]}
+]}.
+
+{translation, "rabbitmq_mqtt.max_session_expiry_interval_secs",
+fun(Conf) ->
+ case cuttlefish:conf_get("mqtt.max_session_expiry_interval_secs", Conf) of
+ N when is_integer(N) andalso N < 0 ->
+ cuttlefish:invalid("negative integer not allowed");
+ Valid ->
+ Valid
end
end}.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_packet.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_packet.erl
index 0f2314fb6a..c40f07de1d 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_packet.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_packet.erl
@@ -205,7 +205,7 @@ parse_connect(<<Len:16, ProtoName:Len/binary,
WillRetain:1,
WillQos:2,
WillFlag:1,
- CleanSession:1,
+ CleanStart:1,
_Reserved:1,
KeepAlive:16,
Rest0/binary>>) ->
@@ -229,7 +229,7 @@ parse_connect(<<Len:16, ProtoName:Len/binary,
will_retain = int_to_bool(WillRetain),
will_qos = WillQos,
will_flag = int_to_bool(WillFlag),
- clean_sess = int_to_bool(CleanSession),
+ clean_start = int_to_bool(CleanStart),
keep_alive = KeepAlive,
props = Props,
client_id = ClientId,
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 0b25c4e804..82744a5c2c 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -37,8 +37,10 @@
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(CONSUMER_TAG, <<"mqtt">>).
+-define(QUEUE_TTL_KEY, <<"x-expires">>).
-type send_fun() :: fun((iodata()) -> ok).
+-type session_expiry_interval() :: non_neg_integer() | infinity.
-record(auth_state,
{user :: #user{},
@@ -48,7 +50,8 @@
-record(cfg,
{socket :: rabbit_net:socket(),
proto_ver :: protocol_version_atom(),
- clean_sess :: boolean(),
+ clean_start :: boolean(),
+ session_expiry_interval_secs :: session_expiry_interval(),
will_msg :: option(mqtt_msg()),
exchange :: rabbit_exchange:name(),
%% Set if client has at least one subscription with QoS 1.
@@ -118,24 +121,52 @@ process_connect(
username = Username0,
password = Password0,
proto_ver = ProtoVer,
- clean_sess = CleanSess,
+ clean_start = CleanStart,
client_id = ClientId0,
keep_alive = KeepaliveSecs,
props = ConnectProps} = Packet,
Socket, ConnName0, SendFun, {PeerIp, PeerPort, Ip, Port}) ->
- ?LOG_DEBUG("Received a CONNECT, client ID: ~s, username: ~s, clean session: ~s, "
+ ?LOG_DEBUG("Received a CONNECT, client ID: ~s, username: ~s, clean start: ~s, "
"protocol version: ~p, keepalive: ~p, property names: ~p",
- [ClientId0, Username0, CleanSess, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]),
+ [ClientId0, Username0, CleanStart, ProtoVer, KeepaliveSecs, maps:keys(ConnectProps)]),
SslLoginName = ssl_login_name(Socket),
Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
true -> flow;
false -> noflow
end,
MaxPacketSize = maps:get('Maximum-Packet-Size', ConnectProps, ?MAX_PACKET_SIZE),
+ {ok, MaxSessionExpiry} = application:get_env(?APP_NAME, max_session_expiry_interval_secs),
+ SessionExpiry =
+ case {ProtoVer, CleanStart} of
+ {5, _} ->
+ %% "If the Session Expiry Interval is absent the value 0 is used."
+ case maps:get('Session-Expiry-Interval', ConnectProps, 0) of
+ ?UINT_MAX ->
+ %% "If the Session Expiry Interval is 0xFFFFFFFF (UINT_MAX),
+ %% the Session does not expire."
+ min(infinity, MaxSessionExpiry);
+ Seconds ->
+ min(Seconds, MaxSessionExpiry)
+ end;
+ {_, _CleanSession = true} ->
+ %% "Setting Clean Start to 1 and a Session Expiry Interval of 0, is equivalent
+ %% to setting CleanSession to 1 in the MQTT Specification Version 3.1.1."
+ 0;
+ {_, _CleanSession = false} ->
+ %% The following sentence of the MQTT 5 spec 3.1.2.11.2 is wrong:
+ %% "Setting Clean Start to 0 and no Session Expiry Interval, is equivalent to
+ %% setting CleanSession to 0 in the MQTT Specification Version 3.1.1."
+ %% Correct is:
+ %% "CleanStart=0 and SessionExpiry=0xFFFFFFFF (UINT_MAX) for MQTT 5.0 would
+ %% provide the same as CleanSession=0 for 3.1.1."
+ %% see https://issues.oasis-open.org/projects/MQTT/issues/MQTT-538
+ %% Therefore, we use the maximum allowed session expiry interval.
+ MaxSessionExpiry
+ end,
Result0 =
maybe
ok ?= check_protocol_version(ProtoVer),
- {ok, ClientId} ?= ensure_client_id(ClientId0, CleanSess, ProtoVer),
+ {ok, ClientId} ?= ensure_client_id(ClientId0, CleanStart, ProtoVer),
{ok, {Username1, Password}} ?= check_credentials(Username0, Password0, SslLoginName, PeerIp),
{VHostPickedUsing, {VHost, Username2}} = get_vhost(Username1, SslLoginName, Port),
@@ -157,7 +188,8 @@ process_connect(
#state{
cfg = #cfg{socket = Socket,
proto_ver = proto_integer_to_atom(ProtoVer),
- clean_sess = CleanSess,
+ clean_start = CleanStart,
+ session_expiry_interval_secs = SessionExpiry,
ssl_login_name = SslLoginName,
delivery_flow = Flow,
trace_state = TraceState,
@@ -191,7 +223,11 @@ process_connect(
'Maximum-Packet-Size' => persistent_term:get(
?PERSISTENT_TERM_MAX_PACKET_SIZE_AUTHENTICATED),
'Subscription-Identifier-Available' => 0,
- 'Shared-Subscription-Available' => 0
+ 'Shared-Subscription-Available' => 0,
+ 'Session-Expiry-Interval' => case SessionExpiry of
+ infinity -> ?UINT_MAX;
+ Secs -> Secs
+ end
},
Props = case {ClientId0, ProtoVer} of
{<<>>, 5} ->
@@ -250,8 +286,8 @@ connect_reason_code_to_return_code(_) ->
process_connect(State0) ->
maybe
- {ok, QoS0SessPresent, State1} ?= handle_clean_sess_qos0(State0),
- {ok, SessPresent, State2} ?= handle_clean_sess_qos1(QoS0SessPresent, State1),
+ {ok, QoS0SessPresent, State1} ?= handle_clean_start_qos0(State0),
+ {ok, SessPresent, State2} ?= handle_clean_start_qos1(QoS0SessPresent, State1),
State = cache_subscriptions(SessPresent, State2),
rabbit_networking:register_non_amqp_connection(self()),
self() ! connection_created,
@@ -313,7 +349,7 @@ process_request(?PUBLISH,
%% "If the Server included a Maximum QoS in its CONNACK response
%% to a Client and it receives a PUBLISH packet with a QoS greater than this
%% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)."
- ?LOG_WARNING("Received PUBLISH with QoS2. Disconnecting MQTT client ~ts", [ClientId]),
+ ?LOG_WARNING("Received a PUBLISH with QoS2. Disconnecting MQTT client ~ts", [ClientId]),
send_disconnect(?RC_QOS_NOT_SUPPORTED, State),
{stop, {disconnect, server_initiated}, State};
process_request(?PUBLISH,
@@ -445,10 +481,73 @@ process_request(?PINGREQ, #mqtt_packet{}, State = #state{cfg = #cfg{client_id =
?LOG_DEBUG("Sent a PINGRESP to client ID ~s", [ClientId]),
{ok, State};
-process_request(?DISCONNECT, #mqtt_packet{}, State) ->
- ?LOG_DEBUG("Received a DISCONNECT"),
+%%TODO Check ReasonCode whether to publish a will message:
+%% "If the Network Connection is closed without the Client first sending a DISCONNECT packet with Reason
+%% Code 0x00 (Normal disconnection) and the Connection has a Will Message, the Will Message is published."
+process_request(?DISCONNECT,
+ #mqtt_packet{variable = #mqtt_packet_disconnect{reason_code = Rc,
+ props = Props}},
+ #state{cfg = #cfg{session_expiry_interval_secs = CurrentSEI} = Cfg} = State0) ->
+ ?LOG_DEBUG("Received a DISCONNECT with reason code ~b and properties ~p", [Rc, Props]),
+ RequestedSEI = case maps:find('Session-Expiry-Interval', Props) of
+ {ok, ?UINT_MAX} ->
+ %% "If the Session Expiry Interval is 0xFFFFFFFF (UINT_MAX),
+ %% the Session does not expire."
+ infinity;
+ {ok, Secs} ->
+ Secs;
+ error ->
+ %% "If the Session Expiry Interval is absent, the Session
+ %% Expiry Interval in the CONNECT packet is used."
+ CurrentSEI
+ end,
+ State =
+ case CurrentSEI of
+ RequestedSEI ->
+ State0;
+ 0 when RequestedSEI > 0 ->
+ %% "If the Session Expiry Interval in the CONNECT packet was zero, then it is a Protocol
+ %% Error to set a non-zero Session Expiry Interval in the DISCONNECT packet sent by the
+ %% Client. If such a non-zero Session Expiry Interval is received by the Server, it does
+ %% not treat it as a valid DISCONNECT packet. The Server uses DISCONNECT with Reason
+ %% Code 0x82 (Protocol Error) as described in section 4.13."
+ %% The last sentence does not make sense because the client already closed the network
+ %% connection after it sent us the DISCONNECT. Hence, we do not reply with another
+ %% DISCONNECT.
+ ?LOG_WARNING("MQTT protocol error: Ignoring requested Session Expiry "
+ "Interval ~p in DISCONNECT because it was 0 in CONNECT.",
+ [RequestedSEI, Props]),
+ State0;
+ _ ->
+ %% "The session expiry interval can be modified at disconnect."
+ {ok, MaxSEI} = application:get_env(?APP_NAME, max_session_expiry_interval_secs),
+ NewSEI = min(RequestedSEI, MaxSEI),
+ lists:foreach(fun(QName) ->
+ update_session_expiry_interval(QName, NewSEI)
+ end, existing_queue_names(State0)),
+ State0#state{cfg = Cfg#cfg{session_expiry_interval_secs = NewSEI}}
+ end,
{stop, {disconnect, client_initiated}, State}.
+-spec update_session_expiry_interval(rabbit_amqqueue:name(), session_expiry_interval()) -> ok.
+update_session_expiry_interval(QName, Expiry) ->
+ Fun = fun(Q) ->
+ Args0 = amqqueue:get_arguments(Q),
+ Args = if Expiry =:= infinity ->
+ proplists:delete(?QUEUE_TTL_KEY, Args0);
+ true ->
+ rabbit_misc:set_table_value(
+ Args0, ?QUEUE_TTL_KEY, long, timer:seconds(Expiry))
+ end,
+ amqqueue:set_arguments(Q, Args)
+ end,
+ case rabbit_amqqueue:update(QName, Fun) of
+ not_found ->
+ ok;
+ Q ->
+ ok = rabbit_queue_type:policy_changed(Q) % respects queue args
+ end.
+
check_protocol_version(ProtoVersion) ->
case lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)) of
true ->
@@ -478,7 +577,7 @@ check_credentials(Username, Password, SslLoginName, PeerIp) ->
-spec ensure_client_id(binary(), boolean(), protocol_version()) ->
{ok, binary()} | {error, reason_code()}.
-ensure_client_id(<<>>, _CleanSess = false, ProtoVer)
+ensure_client_id(<<>>, _CleanStart = false, ProtoVer)
when ProtoVer < 5 ->
?LOG_ERROR("MQTT client ID must be provided for non-clean session in MQTT v~b", [ProtoVer]),
{error, ?RC_CLIENT_IDENTIFIER_NOT_VALID};
@@ -564,16 +663,17 @@ self_consumes(Queue) ->
end, rabbit_amqqueue:consumers(Queue))
end.
-handle_clean_sess_qos0(State) ->
- handle_clean_sess(false, ?QOS_0, State).
+handle_clean_start_qos0(State) ->
+ handle_clean_start(false, ?QOS_0, State).
-handle_clean_sess_qos1(QoS0SessPresent, State) ->
- handle_clean_sess(QoS0SessPresent, ?QOS_1, State).
+handle_clean_start_qos1(QoS0SessPresent, State) ->
+ handle_clean_start(QoS0SessPresent, ?QOS_1, State).
-handle_clean_sess(_, QoS,
- State = #state{cfg = #cfg{clean_sess = true},
- auth_state = #auth_state{user = User = #user{username = Username},
- authz_ctx = AuthzCtx}}) ->
+handle_clean_start(
+ _, QoS,
+ State = #state{cfg = #cfg{clean_start = true},
+ auth_state = #auth_state{user = User = #user{username = Username},
+ authz_ctx = AuthzCtx}}) ->
%% "If the Server accepts a connection with CleanSession set to 1, the Server
%% MUST set Session Present to 0 in the CONNACK packet [MQTT-3.2.2-1].
SessPresent = false,
@@ -591,13 +691,15 @@ handle_clean_sess(_, QoS,
{error, ?RC_NOT_AUTHORIZED}
end
end;
-handle_clean_sess(SessPresent, QoS,
- State0 = #state{cfg = #cfg{clean_sess = false}}) ->
+handle_clean_start(SessPresent, QoS,
+ State0 = #state{cfg = #cfg{clean_start = false}}) ->
case get_queue(QoS, State0) of
{error, _} ->
%% Queue will be created later when client subscribes.
{ok, SessPresent, State0};
{ok, Q} ->
+ %%TODO modify Queue TTL if Session-Expiry-Interval is different
+ %% see update_session_expiry_interval/2
case consume(Q, QoS, State0) of
{ok, State} ->
{ok, _SessionPresent = true, State};
@@ -628,6 +730,7 @@ get_queue(QoS, State) ->
Err
end.
+-spec queue_name(qos(), state()) -> rabbit_amqqueue:name().
queue_name(?QOS_1, #state{cfg = #cfg{queue_qos1 = #resource{kind = queue} = Name}}) ->
Name;
queue_name(QoS, #state{cfg = #cfg{client_id = ClientId,
@@ -635,6 +738,19 @@ queue_name(QoS, #state{cfg = #cfg{client_id = ClientId,
QNameBin = rabbit_mqtt_util:queue_name_bin(ClientId, QoS),
rabbit_misc:r(VHost, queue, QNameBin).
+%% Returns names of queues that exist in the database.
+-spec existing_queue_names(state()) -> [rabbit_amqqueue:name()].
+existing_queue_names(State) ->
+ QNames = [queue_name(QoS, State) || QoS <- [?QOS_0, ?QOS_1]],
+ lists:filter(fun rabbit_amqqueue:exists/1, QNames).
+
+%% To save memory, we only store the queue_qos1 value in process state if there is a QoS 1 subscription.
+%% We store it in the process state such that we don't have to build the binary on every PUBACK we receive.
+maybe_set_queue_qos1(?QOS_1, State = #state{cfg = Cfg = #cfg{queue_qos1 = undefined}}) ->
+ State#state{cfg = Cfg#cfg{queue_qos1 = queue_name(?QOS_1, State)}};
+maybe_set_queue_qos1(_, State) ->
+ State.
+
%% Query subscriptions from the database and hold them in process state
%% to avoid future mnesia:match_object/3 queries.
cache_subscriptions(_SessionPresent = _SubscriptionsPresent = true,
@@ -978,7 +1094,7 @@ create_queue(
QoS, #state{cfg = #cfg{
vhost = VHost,
client_id = ClientId,
- clean_sess = CleanSess},
+ session_expiry_interval_secs = SessionExpiry},
auth_state = #auth_state{
user = User = #user{username = Username},
authz_ctx = AuthzCtx}
@@ -991,16 +1107,16 @@ create_queue(
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false ->
rabbit_core_metrics:queue_declared(QName),
- QArgs = queue_args(QoS, CleanSess),
+ QArgs = queue_args(QoS, SessionExpiry),
Q0 = amqqueue:new(QName,
self(),
_Durable = true,
_AutoDelete = false,
- queue_owner(CleanSess),
+ queue_owner(SessionExpiry),
QArgs,
VHost,
#{user => Username},
- queue_type(QoS, CleanSess, QArgs)
+ queue_type(QoS, SessionExpiry, QArgs)
),
case rabbit_queue_type:declare(Q0, node()) of
{new, Q} when ?is_amqqueue(Q) ->
@@ -1021,32 +1137,32 @@ create_queue(
E
end.
--spec queue_owner(CleanSession :: boolean()) ->
+-spec queue_owner(SessionExpiryInterval :: non_neg_integer()) ->
pid() | none.
-queue_owner(true) ->
- %% Exclusive queues are auto-deleted after node restart while auto-delete queues are not.
- %% Therefore make durable queue exclusive.
+queue_owner(0) ->
+ %% Session Expiry Interval set to 0 means that the Session ends when the Network
+ %% Connection is closed. Therefore we want the queue to be auto deleted.
+ %% Exclusive queues are auto deleted after node restart while auto-delete queues are not.
+ %% Therefore make the durable queue exclusive.
self();
-queue_owner(false) ->
+queue_owner(_) ->
none.
-queue_args(QoS, false) ->
- Args = case rabbit_mqtt_util:env(subscription_ttl) of
- Ms when is_integer(Ms) ->
- [{<<"x-expires">>, long, Ms}];
- _ ->
- []
+queue_args(_, 0) ->
+ [];
+queue_args(QoS, SessionExpiry) ->
+ Args = case SessionExpiry of
+ infinity -> [];
+ Secs -> [{?QUEUE_TTL_KEY, long, timer:seconds(Secs)}]
end,
case {QoS, rabbit_mqtt_util:env(durable_queue_type)} of
{?QOS_1, quorum} ->
[{<<"x-queue-type">>, longstr, <<"quorum">>} | Args];
_ ->
Args
- end;
-queue_args(_, _) ->
- [].
+ end.
-queue_type(?QOS_0, true, QArgs) ->
+queue_type(?QOS_0, 0, QArgs) ->
case rabbit_queue_type:is_enabled(?QUEUE_TYPE_QOS_0) of
true ->
?QUEUE_TYPE_QOS_0;
@@ -1102,13 +1218,6 @@ consume(Q, QoS, #state{
Err
end.
-%% To save memory, we only store the queue_qos1 value in process state if there is a QoS 1 subscription.
-%% We store it in the process state such that we don't have to build the binary on every PUBACK we receive.
-maybe_set_queue_qos1(?QOS_1, State = #state{cfg = Cfg = #cfg{queue_qos1 = undefined}}) ->
- State#state{cfg = Cfg#cfg{queue_qos1 = queue_name(?QOS_1, State)}};
-maybe_set_queue_qos1(_, State) ->
- State.
-
bind(QName, TopicName, State) ->
binding_action_with_checks(QName, TopicName, add, State).
@@ -1163,8 +1272,8 @@ publish_to_queues(
RoutingKey = mqtt_to_amqp(Topic),
Confirm = Qos > ?QOS_0,
{Expiration, Timestamp} = case Props of
- #{'Message-Expiry-Interval' := ExpirySeconds} ->
- {integer_to_binary(ExpirySeconds * 1000),
+ #{'Message-Expiry-Interval' := ExpirySecs} ->
+ {integer_to_binary(timer:seconds(ExpirySecs)),
os:system_time(second)};
_ ->
{undefined, undefined}
@@ -1349,7 +1458,7 @@ unregister_client(#state{cfg = #cfg{client_id = ClientIdBin}}) ->
end.
maybe_delete_mqtt_qos0_queue(
- State = #state{cfg = #cfg{clean_sess = true},
+ State = #state{cfg = #cfg{clean_start = true},
auth_state = #auth_state{user = #user{username = Username}}}) ->
case get_queue(?QOS_0, State) of
{ok, Q} ->
@@ -1816,7 +1925,11 @@ info(ssl_login_name, #state{cfg = #cfg{ssl_login_name = Val}}) -> Val;
info(user_who_performed_action, S) ->
info(user, S);
info(user, #state{auth_state = #auth_state{user = #user{username = Val}}}) -> Val;
-info(clean_sess, #state{cfg = #cfg{clean_sess = Val}}) -> Val;
+info(clean_sess, #state{cfg = #cfg{clean_start = CleanStart,
+ session_expiry_interval_secs = SEI}}) ->
+ %% "Setting Clean Start to 1 and a Session Expiry Interval of 0, is equivalent
+ %% to setting CleanSession to 1 in the MQTT Specification Version 3.1.1."
+ CleanStart andalso SEI =:= 0;
info(will_msg, #state{cfg = #cfg{will_msg = Val}}) -> Val;
info(retainer_pid, #state{cfg = #cfg{retainer_pid = Val}}) -> Val;
info(exchange, #state{cfg = #cfg{exchange = #resource{name = Val}}}) -> Val;
@@ -1977,7 +2090,8 @@ format_status(
cfg = #cfg{
socket = Socket,
proto_ver = ProtoVersion,
- clean_sess = CleanSess,
+ clean_start = CleanStart,
+ session_expiry_interval_secs = SessionExpiryInterval,
will_msg = WillMsg,
exchange = Exchange,
queue_qos1 = _,
@@ -1998,7 +2112,8 @@ format_status(
}}) ->
Cfg = #{socket => Socket,
proto_ver => ProtoVersion,
- clean_sess => CleanSess,
+ clean_start => CleanStart,
+ session_expiry_interval_secs => SessionExpiryInterval,
will_msg_defined => WillMsg =/= undefined,
exchange => Exchange,
published => Published,
diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
index d2c16c1db2..e626a629fd 100644
--- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
@@ -17,6 +17,7 @@
-define(FAIL_IF_CRASH_LOG, {["Generic server.*terminating"],
fun () -> ct:fail(crash_detected) end}).
-import(rabbit_ct_broker_helpers, [rpc/5]).
+-import(util, [non_clean_sess_opts/0]).
all() ->
[
@@ -564,7 +565,7 @@ no_queue_unbind_permission(Config) ->
{clientid, User},
{username, User},
{password, ?config(mqtt_password, Config)}],
- {ok, C1} = emqtt:start_link([{clean_start, false} | Opts]),
+ {ok, C1} = emqtt:start_link(non_clean_sess_opts() ++ Opts),
{ok, _} = emqtt:connect(C1),
Topic = <<"my/topic">>,
?assertMatch({ok, _Properties, [1]},
@@ -573,7 +574,7 @@ no_queue_unbind_permission(Config) ->
%% Revoke read access to amq.topic exchange.
rabbit_ct_broker_helpers:set_permissions(Config, User, Vhost, <<".*">>, <<".*">>, <<"^(?!amq\.topic$)">>),
- {ok, C2} = emqtt:start_link([{clean_start, false} | Opts]),
+ {ok, C2} = emqtt:start_link(non_clean_sess_opts() ++ Opts),
{ok, _} = emqtt:connect(C2),
process_flag(trap_exit, true),
%% We subscribe with the same client ID to the same topic again, but this time with QoS 0.
@@ -610,7 +611,7 @@ no_queue_delete_permission(Config) ->
?config(mqtt_password, Config),
Config,
ClientId,
- [{clean_start, false}]),
+ non_clean_sess_opts()),
{ok, _} = emqtt:connect(C1),
{ok, _, _} = emqtt:subscribe(C1, {<<"test/topic">>, qos1}),
ok = emqtt:disconnect(C1),
@@ -646,7 +647,7 @@ no_queue_consume_permission_on_connect(Config) ->
?config(mqtt_password, Config),
Config,
ClientId,
- [{clean_start, false}]),
+ non_clean_sess_opts()),
{ok, _} = emqtt:connect(C1),
{ok, _, _} = emqtt:subscribe(C1, {<<"test/topic">>, qos1}),
ok = emqtt:disconnect(C1),
@@ -657,7 +658,7 @@ no_queue_consume_permission_on_connect(Config) ->
?config(mqtt_password, Config),
Config,
ClientId,
- [{clean_start, false}]),
+ non_clean_sess_opts()),
unlink(C2),
?assertMatch({error, _},
emqtt:connect(C2)),
diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets
index 9ddbd45a45..56e648dcc5 100644
--- a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets
+++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets
@@ -5,7 +5,7 @@
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
- mqtt.subscription_ttl = 1800000
+ mqtt.max_session_expiry_interval_secs = 1800
mqtt.prefetch = 10
mqtt.sparkplug = true
mqtt.listeners.ssl = none
@@ -24,7 +24,7 @@
{allow_anonymous,true},
{vhost,<<"/">>},
{exchange,<<"amq.topic">>},
- {subscription_ttl,1800000},
+ {max_session_expiry_interval_secs,1800},
{prefetch,10},
{sparkplug,true},
{ssl_listeners,[]},
@@ -104,7 +104,7 @@
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
- mqtt.subscription_ttl = undefined
+ mqtt.max_session_expiry_interval_secs = infinity
mqtt.prefetch = 10
mqtt.proxy_protocol = true",
[{rabbit,[{tcp_listeners,[5672]}]},
@@ -114,7 +114,7 @@
{allow_anonymous,true},
{vhost,<<"/">>},
{exchange,<<"amq.topic">>},
- {subscription_ttl,undefined},
+ {max_session_expiry_interval_secs,infinity},
{prefetch,10},
{proxy_protocol,true}]}],
[rabbitmq_mqtt]},
@@ -124,7 +124,7 @@
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
- mqtt.subscription_ttl = 1800000
+ mqtt.max_session_expiry_interval_secs = 1800
mqtt.prefetch = 10
## use DETS (disk-based) store for retained messages
mqtt.retained_message_store = rabbit_mqtt_retained_msg_store_dets
@@ -139,7 +139,7 @@
{allow_anonymous,true},
{vhost,<<"/">>},
{exchange,<<"amq.topic">>},
- {subscription_ttl,1800000},
+ {max_session_expiry_interval_secs,1800},
{prefetch,10},
{retained_message_store,rabbit_mqtt_retained_msg_store_dets},
{retained_message_store_dets_sync_interval,2000},
@@ -160,4 +160,4 @@
{max_packet_size_unauthenticated, 1234},
{max_packet_size_authenticated, 5678}]}],
[rabbitmq_mqtt]}
-]. \ No newline at end of file
+].
diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
index 33eedc767d..daa7b22994 100644
--- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
@@ -16,9 +16,10 @@
-import(util, [all_connection_pids/1,
publish_qos1_timeout/4,
expect_publishes/3,
- connect/2,
- connect/3,
- await_exit/1]).
+ connect/2, connect/3,
+ await_exit/1,
+ non_clean_sess_opts/0
+ ]).
all() ->
[
@@ -186,9 +187,9 @@ set_env(QueueType) ->
get_env() ->
rabbit_mqtt_util:env(durable_queue_type).
-validate_durable_queue_type(Config, ClientName, CleanSession, ExpectedQueueType) ->
+validate_durable_queue_type(Config, ClientName, Opts, ExpectedQueueType) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- C = connect(ClientName, Config, [{clean_start, CleanSession}]),
+ C = connect(ClientName, Config, Opts),
{ok, _, _} = emqtt:subscribe(C, <<"TopicB">>, qos1),
ok = emqtt:publish(C, <<"TopicB">>, <<"Payload">>),
ok = expect_publishes(C, <<"TopicB">>, [<<"Payload">>]),
@@ -202,7 +203,8 @@ validate_durable_queue_type(Config, ClientName, CleanSession, ExpectedQueueType)
quorum_clean_session_false(Config) ->
Default = rpc(Config, reader_SUITE, get_env, []),
rpc(Config, reader_SUITE, set_env, [quorum]),
- validate_durable_queue_type(Config, <<"quorumCleanSessionFalse">>, false, rabbit_quorum_queue),
+ validate_durable_queue_type(
+ Config, <<"quorumCleanSessionFalse">>, non_clean_sess_opts(), rabbit_quorum_queue),
rpc(Config, reader_SUITE, set_env, [Default]).
quorum_clean_session_true(Config) ->
@@ -210,14 +212,17 @@ quorum_clean_session_true(Config) ->
rpc(Config, reader_SUITE, set_env, [quorum]),
%% Since we use a clean session and quorum queues cannot be auto-delete or exclusive,
%% we expect a classic queue.
- validate_durable_queue_type(Config, <<"quorumCleanSessionTrue">>, true, rabbit_classic_queue),
+ validate_durable_queue_type(
+ Config, <<"quorumCleanSessionTrue">>, [{clean_start, true}], rabbit_classic_queue),
rpc(Config, reader_SUITE, set_env, [Default]).
classic_clean_session_true(Config) ->
- validate_durable_queue_type(Config, <<"classicCleanSessionTrue">>, true, rabbit_classic_queue).
+ validate_durable_queue_type(
+ Config, <<"classicCleanSessionTrue">>, [{clean_start, true}], rabbit_classic_queue).
classic_clean_session_false(Config) ->
- validate_durable_queue_type(Config, <<"classicCleanSessionFalse">>, false, rabbit_classic_queue).
+ validate_durable_queue_type(
+ Config, <<"classicCleanSessionFalse">>, non_clean_sess_opts(), rabbit_classic_queue).
event_authentication_failure(Config) ->
{ok, C} = emqtt:start_link(
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
index c45c425112..cff0ce85c4 100644
--- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
@@ -41,7 +41,8 @@
connect/2, connect/3, connect/4,
get_events/1, assert_event_type/2, assert_event_prop/2,
await_exit/1, await_exit/2,
- publish_qos1_timeout/4]).
+ publish_qos1_timeout/4,
+ non_clean_sess_opts/0]).
-import(rabbit_mgmt_test_util,
[http_get/2,
http_delete/3]).
@@ -81,7 +82,7 @@ cluster_size_1_tests() ->
global_counters %% must be the 1st test case
,block_only_publisher
,many_qos1_messages
- ,subscription_ttl
+ ,session_expiry
,management_plugin_connection
,management_plugin_enable
,disconnect
@@ -650,12 +651,12 @@ consuming_classic_mirrored_queue_down(Config) ->
{<<"queue-master-locator">>, <<"client-local">>}]),
%% Declare queue leader on Server1.
- C1 = connect(ClientId, Config, Server1, [{clean_start, false}]),
+ C1 = connect(ClientId, Config, Server1, non_clean_sess_opts()),
{ok, _, _} = emqtt:subscribe(C1, Topic, qos1),
ok = emqtt:disconnect(C1),
%% Consume from Server2.
- C2 = connect(ClientId, Config, Server2, [{clean_start, false}]),
+ C2 = connect(ClientId, Config, Server2, non_clean_sess_opts()),
%% Sanity check that consumption works.
{ok, _} = emqtt:publish(C2, Topic, <<"m1">>, qos1),
@@ -685,12 +686,12 @@ consuming_classic_queue_down(Config) ->
ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
%% Declare classic queue on Server1.
- C1 = connect(ClientId, Config, [{clean_start, false}]),
+ C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, _} = emqtt:subscribe(C1, Topic, qos1),
ok = emqtt:disconnect(C1),
%% Consume from Server3.
- C2 = connect(ClientId, Config, Server3, [{clean_start, false}]),
+ C2 = connect(ClientId, Config, Server3, non_clean_sess_opts()),
ProtoVer = ?config(mqtt_version, Config),
?assertMatch(#{consumers := 1},
@@ -776,20 +777,20 @@ delete_create_queue(Config) ->
delete_queue(Ch, [CQ1, CQ2, QQ]),
ok = emqtt:disconnect(C).
-subscription_ttl(Config) ->
- TTL = 1000,
+session_expiry(Config) ->
App = rabbitmq_mqtt,
- Par = ClientId = ?FUNCTION_NAME,
+ Par = max_session_expiry_interval_secs,
+ Seconds = 1,
{ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]),
- ok = rpc(Config, application, set_env, [App, Par, TTL]),
+ ok = rpc(Config, application, set_env, [App, Par, Seconds]),
- C = connect(ClientId, Config, [{clean_start, false}]),
+ C = connect(?FUNCTION_NAME, Config, non_clean_sess_opts()),
{ok, _, [0, 1]} = emqtt:subscribe(C, [{<<"topic0">>, qos0},
{<<"topic1">>, qos1}]),
ok = emqtt:disconnect(C),
?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
- timer:sleep(TTL + 100),
+ timer:sleep(timer:seconds(Seconds) + 100),
?assertEqual(0, rpc(Config, rabbit_amqqueue, count, [])),
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
@@ -804,7 +805,7 @@ non_clean_sess_reconnect(Config, SubscriptionQoS) ->
Pub = connect(<<"publisher">>, Config),
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
- C1 = connect(ClientId, Config, [{clean_start, false}]),
+ C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, _} = emqtt:subscribe(C1, Topic, SubscriptionQoS),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
@@ -817,7 +818,10 @@ non_clean_sess_reconnect(Config, SubscriptionQoS) ->
ok = emqtt:publish(Pub, Topic, <<"msg-3-qos0">>, qos0),
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-4-qos1">>, qos1),
- C2 = connect(ClientId, Config, [{clean_start, false}]),
+ C2 = connect(ClientId, Config, non_clean_sess_opts()),
+ %% Server should reply in CONNACK that it has session state.
+ ?assertEqual({session_present, 1},
+ proplists:lookup(session_present, emqtt:info(C2))),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
@@ -851,7 +855,7 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
Topic1 = <<"t/1">>,
ClientId = ?FUNCTION_NAME,
- C1 = connect(ClientId, Config, [{clean_start, false}]),
+ C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, {Topic0, qos0}]),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
@@ -863,7 +867,7 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
{ok, _} = emqtt:publish(Pub, Topic0, <<"msg-0">>, qos1),
{ok, _} = emqtt:publish(Pub, Topic1, <<"msg-1">>, qos1),
- C2 = connect(ClientId, Config, [{clean_start, false}]),
+ C2 = connect(ClientId, Config, non_clean_sess_opts()),
?assertMatch(#{consumers := 1},
get_global_counters(Config)),
@@ -876,7 +880,7 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
ok = emqtt:disconnect(C3).
non_clean_sess_empty_client_id(Config) ->
- {C, Connect} = util:start_client(<<>>, Config, 0, [{clean_start, false}]),
+ {C, Connect} = util:start_client(<<>>, Config, 0, non_clean_sess_opts()),
case ?config(mqtt_version, Config) of
V when V =:= v3;
V =:= v4 ->
@@ -918,7 +922,7 @@ subscribe_same_topic_same_qos(Config) ->
ok = emqtt:disconnect(C).
subscribe_same_topic_different_qos(Config) ->
- C = connect(?FUNCTION_NAME, Config, [{clean_start, false}]),
+ C = connect(?FUNCTION_NAME, Config, non_clean_sess_opts()),
Topic = <<"b/c">>,
{ok, _} = emqtt:publish(C, Topic, <<"retained">>, [{retain, true},
@@ -1466,7 +1470,7 @@ default_queue_type(Config) ->
%% Test that the configured default queue type does not apply to MQTT.
Creds = [{username, <<Vhost/binary, ":guest">>},
{password, <<"guest">>}],
- C1 = connect(ClientId, Config, [{clean_start, false} | Creds]),
+ C1 = connect(ClientId, Config, Creds ++ non_clean_sess_opts()),
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
ClassicQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_classic_queue]),
?assertEqual(1, length(ClassicQueues)),
diff --git a/deps/rabbitmq_mqtt/test/util.erl b/deps/rabbitmq_mqtt/test/util.erl
index 97d80a2bdb..2b4d27c085 100644
--- a/deps/rabbitmq_mqtt/test/util.erl
+++ b/deps/rabbitmq_mqtt/test/util.erl
@@ -24,7 +24,8 @@
assert_message_expiry_interval/2,
await_exit/1,
await_exit/2,
- maybe_skip_v5/1
+ maybe_skip_v5/1,
+ non_clean_sess_opts/0
]).
all_connection_pids(Config) ->
@@ -162,6 +163,13 @@ maybe_skip_v5(Config) ->
Config
end.
+%% "CleanStart=0 and SessionExpiry=0xFFFFFFFF (UINT_MAX) for
+%% MQTT 5.0 would provide the same as CleanSession=0 for 3.1.1."
+%% https://issues.oasis-open.org/projects/MQTT/issues/MQTT-538
+non_clean_sess_opts() ->
+ [{clean_start, false},
+ {properties, #{'Session-Expiry-Interval' => 16#FFFFFFFF}}].
+
connect(ClientId, Config) ->
connect(ClientId, Config, []).
diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl
index 78e8223815..e863edde90 100644
--- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl
@@ -17,8 +17,15 @@
[
start_client/4,
connect/2, connect/3, connect/4,
- assert_message_expiry_interval/2
+ assert_message_expiry_interval/2,
+ non_clean_sess_opts/0
]).
+-import(rabbit_ct_broker_helpers,
+ [rpc/4]).
+-import(rabbit_ct_helpers,
+ [eventually/1]).
+
+-define(QUEUE_TTL_KEY, <<"x-expires">>).
all() ->
[{group, mqtt},
@@ -27,11 +34,13 @@ all() ->
groups() ->
[
{mqtt, [],
- [{cluster_size_1, [shuffle], cluster_size_1_tests()},
- {cluster_size_3, [shuffle], cluster_size_3_tests()}]},
+ [{cluster_size_1, [shuffle], cluster_size_1_tests()}
+ % {cluster_size_3, [shuffle], cluster_size_3_tests()}
+ ]},
{web_mqtt, [],
- [{cluster_size_1, [shuffle], cluster_size_1_tests()},
- {cluster_size_3, [shuffle], cluster_size_3_tests()}]}
+ [{cluster_size_1, [shuffle], cluster_size_1_tests()}
+ % {cluster_size_3, [shuffle], cluster_size_3_tests()}
+ ]}
].
cluster_size_1_tests() ->
@@ -42,15 +51,19 @@ cluster_size_1_tests() ->
message_expiry_interval,
message_expiry_interval_will_message,
message_expiry_interval_retained_message,
+ session_expiry_interval_classic_queue_disconnect_decrease,
+ session_expiry_interval_quorum_queue_disconnect_decrease,
+ session_expiry_interval_disconnect_zero_to_non_zero,
+ session_expiry_interval_disconnect_non_zero_to_zero,
+ session_expiry_interval_disconnect_infinity_to_zero,
+ session_expiry_interval_disconnect_to_infinity,
client_publish_qos2,
client_rejects_publish,
will_qos2
].
-cluster_size_3_tests() ->
- [
- satisfy_bazel
- ].
+% cluster_size_3_tests() ->
+% [].
suite() ->
[{timetrap, {minutes, 1}}].
@@ -86,7 +99,8 @@ init_per_group(Group, Config0) ->
tcp_port_mqtt_tls_extra]}]),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
- {rabbit, [{classic_queue_default_version, 2}]}),
+ {rabbit, [{classic_queue_default_version, 2},
+ {quorum_tick_interval, 200}]}),
Config = rabbit_ct_helpers:run_steps(
Config2,
rabbit_ct_broker_helpers:setup_steps() ++
@@ -148,7 +162,7 @@ message_expiry_interval(Config) ->
NumExpiredBefore = dead_letter_metric(messages_dead_lettered_expired_total, Config),
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
Pub = connect(<<"publisher">>, Config),
- Sub1 = connect(ClientId, Config, [{clean_start, false}]),
+ Sub1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, [1]} = emqtt:subscribe(Sub1, Topic, qos1),
ok = emqtt:disconnect(Sub1),
@@ -157,7 +171,7 @@ message_expiry_interval(Config) ->
{ok, _} = emqtt:publish(Pub, Topic, #{'Message-Expiry-Interval' => 10}, <<"m3">>, [{qos, 1}]),
{ok, _} = emqtt:publish(Pub, Topic, #{'Message-Expiry-Interval' => 2}, <<"m4">>, [{qos, 1}]),
timer:sleep(2001),
- Sub2 = connect(ClientId, Config, [{clean_start, false}]),
+ Sub2 = connect(ClientId, Config, non_clean_sess_opts()),
receive {publish, #{client_pid := Sub2,
topic := Topic,
payload := <<"m2">>,
@@ -197,7 +211,7 @@ message_expiry_interval_will_message(Config) ->
timer:sleep(100),
[ServerPublisherPid] = util:all_connection_pids(Config),
- Sub1 = connect(ClientId, Config, [{clean_start, false}]),
+ Sub1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, [1]} = emqtt:subscribe(Sub1, Topic, qos1),
ok = emqtt:disconnect(Sub1),
@@ -264,6 +278,117 @@ message_expiry_interval_retained_message(Config) ->
ok = emqtt:disconnect(Pub),
ok = emqtt:disconnect(Sub).
+session_expiry_interval_classic_queue_disconnect_decrease(Config) ->
+ ok = session_expiry_interval_disconnect_decrease(rabbit_classic_queue, Config).
+
+session_expiry_interval_quorum_queue_disconnect_decrease(Config) ->
+ ok = rpc(Config, application, set_env, [rabbitmq_mqtt, durable_queue_type, quorum]),
+ ok = session_expiry_interval_disconnect_decrease(rabbit_quorum_queue, Config),
+ ok = rpc(Config, application, unset_env, [rabbitmq_mqtt, durable_queue_type]).
+
+session_expiry_interval_disconnect_decrease(QueueType, Config) ->
+ ClientId = ?FUNCTION_NAME,
+ C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 100}}]),
+ {ok, _, [1]} = emqtt:subscribe(C1, <<"t/1">>, qos1),
+
+ [Q1] = rpc(Config, rabbit_amqqueue, list, []),
+ ?assertEqual(QueueType,
+ amqqueue:get_type(Q1)),
+ ?assertEqual({long, 100_000},
+ rabbit_misc:table_lookup(amqqueue:get_arguments(Q1), ?QUEUE_TTL_KEY)),
+
+ %% DISCONNECT decreases Session Expiry Interval from 100 seconds to 1 second.
+ ok = emqtt:disconnect(C1, _RcNormalDisconnection = 0, #{'Session-Expiry-Interval' => 1}),
+ [Q2] = rpc(Config, rabbit_amqqueue, list, []),
+ ?assertEqual({long, 1_000},
+ rabbit_misc:table_lookup(amqqueue:get_arguments(Q2), ?QUEUE_TTL_KEY)),
+
+ timer:sleep(1500),
+ C2 = connect(ClientId, Config, [{clean_start, false}]),
+ %% Server should reply in CONNACK that it does not have session state for our client ID.
+ ?assertEqual({session_present, 0},
+ proplists:lookup(session_present, emqtt:info(C2))),
+ ok = emqtt:disconnect(C2).
+
+session_expiry_interval_disconnect_zero_to_non_zero(Config) ->
+ ClientId = ?FUNCTION_NAME,
+ C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 0}}]),
+ {ok, _, [1]} = emqtt:subscribe(C1, <<"t/1">>, qos1),
+ %% "If the Session Expiry Interval in the CONNECT packet was zero, then it is a Protocol
+ %% Error to set a non-zero Session Expiry Interval in the DISCONNECT packet sent by the Client.
+ ok = emqtt:disconnect(C1, _RcNormalDisconnection = 0, #{'Session-Expiry-Interval' => 60}),
+ C2 = connect(ClientId, Config, [{clean_start, false}]),
+ %% Due to the prior protocol error, we expect the requested session expiry interval of
+ %% 60 seconds not to be applied. Therefore, the server should reply in CONNACK that
+ %% it does not have session state for our client ID.
+ ?assertEqual({session_present, 0},
+ proplists:lookup(session_present, emqtt:info(C2))),
+ ok = emqtt:disconnect(C2).
+
+session_expiry_interval_disconnect_non_zero_to_zero(Config) ->
+ ClientId = ?FUNCTION_NAME,
+ C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 60}}]),
+ {ok, _, [0, 1]} = emqtt:subscribe(C1, [{<<"t/0">>, qos0},
+ {<<"t/1">>, qos1}]),
+ ?assertEqual(2, rpc(Config, rabbit_amqqueue, count, [])),
+ ok = emqtt:disconnect(C1, _RcNormalDisconnection = 0, #{'Session-Expiry-Interval' => 0}),
+ eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))),
+ C2 = connect(ClientId, Config, [{clean_start, false}]),
+ ?assertEqual({session_present, 0},
+ proplists:lookup(session_present, emqtt:info(C2))),
+ ok = emqtt:disconnect(C2).
+
+session_expiry_interval_disconnect_infinity_to_zero(Config) ->
+ App = rabbitmq_mqtt,
+ Par = max_session_expiry_interval_secs,
+ Default = rpc(Config, application, get_env, [App, Par]),
+ ok = rpc(Config, application, set_env, [App, Par, infinity]),
+ ClientId = ?FUNCTION_NAME,
+
+ C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 16#FFFFFFFF}}]),
+ {ok, _, [1, 0]} = emqtt:subscribe(C1, [{<<"t/1">>, qos1},
+ {<<"t/0">>, qos0}]),
+ Qs = rpc(Config, rabbit_amqqueue, list, []),
+ ?assertEqual(2, length(Qs)),
+ ?assertNot(lists:any(fun(Q) ->
+ proplists:is_defined(?QUEUE_TTL_KEY, amqqueue:get_arguments(Q))
+ end, Qs)),
+
+ ok = emqtt:disconnect(C1, _RcNormalDisconnection = 0, #{'Session-Expiry-Interval' => 0}),
+ eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))),
+ ok = rpc(Config, application, set_env, [App, Par, Default]).
+
+session_expiry_interval_disconnect_to_infinity(Config) ->
+ App = rabbitmq_mqtt,
+ Par = max_session_expiry_interval_secs,
+ Default = rpc(Config, application, get_env, [App, Par]),
+ ok = rpc(Config, application, set_env, [App, Par, infinity]),
+ ClientId = ?FUNCTION_NAME,
+
+ %% Connect with a non-zero and non-infinity Session Expiry Interval.
+ C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 1}}]),
+ {ok, _, [0, 1]} = emqtt:subscribe(C1, [{<<"t/0">>, qos0},
+ {<<"t/1">>, qos1}]),
+ Qs1 = rpc(Config, rabbit_amqqueue, list, []),
+ ?assertEqual(2, length(Qs1)),
+ ?assert(lists:all(fun(Q) ->
+ {long, 1000} =:= rabbit_misc:table_lookup(
+ amqqueue:get_arguments(Q), ?QUEUE_TTL_KEY)
+ end, Qs1)),
+
+ %% Disconnect with infinity should remove queue TTL from both queues.
+ ok = emqtt:disconnect(C1, _RcNormalDisconnection = 0, #{'Session-Expiry-Interval' => 16#FFFFFFFF}),
+ timer:sleep(100),
+ Qs2 = rpc(Config, rabbit_amqqueue, list, []),
+ ?assertEqual(2, length(Qs2)),
+ ?assertNot(lists:any(fun(Q) ->
+ proplists:is_defined(?QUEUE_TTL_KEY, amqqueue:get_arguments(Q))
+ end, Qs2)),
+
+ C2 = connect(ClientId, Config, [{clean_start, true}]),
+ ok = emqtt:disconnect(C2),
+ ok = rpc(Config, application, set_env, [App, Par, Default]).
+
client_publish_qos2(Config) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
{C, Connect} = start_client(ClientId, Config, 0, []),
@@ -302,11 +427,8 @@ will_qos2(Config) ->
unlink(C),
?assertEqual({error, {qos_not_supported, #{}}}, Connect(C)).
-satisfy_bazel(_Config) ->
- ok.
-
dead_letter_metric(Metric, Config) ->
- Counters = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),
+ Counters = rpc(Config, rabbit_global_counters, overview, []),
Map = maps:get([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}], Counters),
maps:get(Metric, Map).