summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2022-10-24 15:50:32 +0100
committerMergify <37929162+mergify[bot]@users.noreply.github.com>2022-10-25 10:54:06 +0000
commit6a42c8a34a02326b5fc650d1f97033c867e3a760 (patch)
tree62251d73ad3d4d8fef253c54f7f3a5f37da7970e
parent00b6c9166d2d9e5421c97fd2dcabaa1c66256e12 (diff)
downloadrabbitmq-server-git-6a42c8a34a02326b5fc650d1f97033c867e3a760.tar.gz
AMQP 1.0: Support the modified outcome
Some client libraries (QPid) will automatically send a disposition with the 'modified' outcome in response to a client local message TTL expiry. To support this case and others we treat 'modified' the same as 'accepted' and simply ack the message back to the queue. This change also contains some API extensions to the amqp10_client to better support sending the various delivery states (outcomes). (cherry picked from commit 802688a8ab6c3a0d55576e68a3605ed6541025f7)
-rw-r--r--deps/amqp10_client/src/amqp10_client.erl14
-rw-r--r--deps/amqp10_client/src/amqp10_client.hrl3
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl5
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl8
-rw-r--r--deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl114
-rwxr-xr-xdeps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs1
6 files changed, 122 insertions, 23 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl
index 785f44f005..a848818554 100644
--- a/deps/amqp10_client/src/amqp10_client.erl
+++ b/deps/amqp10_client/src/amqp10_client.erl
@@ -32,6 +32,7 @@
detach_link/1,
send_msg/2,
accept_msg/2,
+ settle_msg/3,
flow_link_credit/3,
flow_link_credit/4,
echo/1,
@@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session,
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
-accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
+accept_msg(LinkRef, Msg) ->
+ settle_msg(LinkRef, Msg, accepted).
+
+%% @doc Settle a message on a the link referred to be the 'LinkRef' using
+%% the chosen delivery state.
+-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
+ amqp10_client_types:delivery_state()) -> ok.
+settle_msg(#link_ref{role = receiver,
+ session = Session}, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
- DeliveryId, true, accepted).
-
+ DeliveryId, true, Settlement).
%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl
index 38627f80c5..becbd75861 100644
--- a/deps/amqp10_client/src/amqp10_client.hrl
+++ b/deps/amqp10_client/src/amqp10_client.hrl
@@ -20,5 +20,6 @@
-define(DBG(F, A), ok).
-endif.
--record(link_ref, {role :: sender | receiver, session :: pid(),
+-record(link_ref, {role :: sender | receiver,
+ session :: pid(),
link_handle :: non_neg_integer()}).
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
index 48bc4ba932..82838a9ba1 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
@@ -14,14 +14,11 @@
-define(EXCHANGE_SUB_LIFETIME, "delete-on-close").
-define(DEFAULT_OUTCOME, #'v1_0.released'{}).
--define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
- ?V_1_0_SYMBOL_REJECTED,
- ?V_1_0_SYMBOL_RELEASED]).
-
-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
+-define(SUPPORTED_OUTCOMES, ?OUTCOMES).
outcomes(Source) ->
{DefaultOutcome, Outcomes} =
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
index 71bba9ce5d..8c69effe84 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
@@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome,
#'v1_0.accepted'{} ->
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = false};
+ %% we don't care if the client modified the
+ %% so just treat it as accepted.
+ %% Some clients send modified instead of accepted
+ %% when e.g. a client
+ %% side message TTL expires.
+ #'v1_0.modified'{} ->
+ #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false};
#'v1_0.rejected'{} ->
#'basic.reject'{delivery_tag = DeliveryTag,
requeue = false};
diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
index aedd4873eb..fc3f7d6f1f 100644
--- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
@@ -11,6 +11,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-compile(nowarn_export_all).
-compile(export_all).
all() ->
@@ -22,6 +23,7 @@ all() ->
groups() ->
[
{tests, [], [
+ reliable_send_receive_with_outcomes,
roundtrip_quorum_queue_with_drain,
message_headers_conversion
]},
@@ -68,6 +70,75 @@ end_per_testcase(Testcase, Config) ->
%%% TESTS
%%%
+reliable_send_receive_with_outcomes(Config) ->
+ Outcomes = [accepted,
+ modified,
+ rejected,
+ released],
+ [begin
+ ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
+ reliable_send_receive(Config, Outcome)
+ end || Outcome <- Outcomes],
+ ok.
+
+reliable_send_receive(Config, Outcome) ->
+ Container = atom_to_binary(?FUNCTION_NAME, utf8),
+ OutcomeBin = atom_to_binary(Outcome, utf8),
+ QName = <<Container/binary, OutcomeBin/binary>>,
+ %% declare a quorum queue
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = true,
+ arguments = [{<<"x-queue-type">>,
+ longstr, <<"quorum">>}]}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ %% reliable send and consume
+ Host = ?config(rmq_hostname, Config),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ Address = <<"/amq/queue/", QName/binary>>,
+
+ OpnConf = #{address => Host,
+ port => Port,
+ container_id => Container,
+ sasl => {plain, <<"guest">>, <<"guest">>}},
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session(Connection),
+ SenderLinkName = <<"test-sender">>,
+ {ok, Sender} = amqp10_client:attach_sender_link(Session,
+ SenderLinkName,
+ Address),
+ ok = wait_for_credit(Sender),
+ DTag1 = <<"dtag-1">>,
+ %% create an unsettled message,
+ %% link will be in "mixed" mode by default
+ Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
+ ok = amqp10_client:send_msg(Sender, Msg1),
+ ok = wait_for_settlement(DTag1),
+
+ ok = amqp10_client:detach_link(Sender),
+ ok = amqp10_client:close_connection(Connection),
+ flush("post sender close"),
+
+ {ok, Connection2} = amqp10_client:open_connection(OpnConf),
+ {ok, Session2} = amqp10_client:begin_session(Connection2),
+ ReceiverLinkName = <<"test-receiver">>,
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session2,
+ ReceiverLinkName,
+ Address,
+ unsettled),
+ {ok, Msg} = amqp10_client:get_msg(Receiver),
+
+ ct:pal("got ~p", [amqp10_msg:body(Msg)]),
+
+ ok = amqp10_client:settle_msg(Receiver, Msg, Outcome),
+
+ flush("post accept"),
+
+ ok = amqp10_client:detach_link(Receiver),
+ ok = amqp10_client:close_connection(Connection2),
+
+ ok.
+
roundtrip_quorum_queue_with_drain(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
@@ -83,7 +154,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
-
+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
@@ -141,18 +212,18 @@ message_headers_conversion(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
-
+
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]),
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]),
-
+
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
-
+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
-
+
amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
@@ -173,7 +244,7 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
wait_for_accepts(1),
{ok, Headers} = amqp091_get_msg_headers(Ch, QName),
-
+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)).
@@ -252,22 +323,35 @@ open_and_close_connection(OpnConf) ->
ok = amqp10_client:close_connection(Connection).
% before we can send messages we have to wait for credit from the server
-wait_for_credit(Sender) ->
+wait_for_credit(Sender) ->
+ receive
+ {amqp10_event, {link, Sender, credited}} ->
+ flush(?FUNCTION_NAME),
+ ok
+ after 5000 ->
+ flush("wait_for_credit timed out"),
+ ct:fail(credited_timeout)
+ end.
+
+wait_for_settlement(Tag) ->
receive
- {amqp10_event, {link, Sender, credited}} ->
+ {amqp10_disposition, {accepted, Tag}} ->
+ flush(?FUNCTION_NAME),
ok
after 5000 ->
- flush("Credit timed out"),
- exit(credited_timeout)
+ flush("wait_for_settlement timed out"),
+ ct:fail(credited_timeout)
end.
wait_for_accepts(0) -> ok;
-wait_for_accepts(N) ->
- receive
- {amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1)
- after 250 ->
- ok
+wait_for_accepts(N) ->
+ receive
+ {amqp10_disposition,{accepted,_}} ->
+ wait_for_accepts(N -1)
+ after 250 ->
+ ok
end.
+
delete_queue(Config, QName) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
index 6f083c2309..84ed881cc2 100755
--- a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
+++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
@@ -190,6 +190,7 @@ module Test =
let q = "roundtrip-091-q"
let corr = "corrlation"
let sender = SenderLink(c.Session, q + "-sender" , q)
+
new Message("hi"B, Header = Header(),
Properties = new Properties(CorrelationId = corr))
|> sender.Send