summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2022-10-25 15:10:41 +0400
committerGitHub <noreply@github.com>2022-10-25 15:10:41 +0400
commit2b404b579b58f9dcd9d309f149e62cf52d14480b (patch)
tree2d3834da036ea515523345e8cf830d33e09371dc
parent6d64d2fc2ca94d55c67d120a6be2a2357a2094e8 (diff)
parent6a42c8a34a02326b5fc650d1f97033c867e3a760 (diff)
downloadrabbitmq-server-git-2b404b579b58f9dcd9d309f149e62cf52d14480b.tar.gz
Merge pull request #6241 from rabbitmq/mergify/bp/v3.11.x/pr-6228
AMQP 1.0: Support the modified outcome (backport #6228)
-rw-r--r--deps/amqp10_client/src/amqp10_client.erl14
-rw-r--r--deps/amqp10_client/src/amqp10_client.hrl3
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl5
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl8
-rw-r--r--deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl114
-rwxr-xr-xdeps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs1
6 files changed, 122 insertions, 23 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl
index 785f44f005..a848818554 100644
--- a/deps/amqp10_client/src/amqp10_client.erl
+++ b/deps/amqp10_client/src/amqp10_client.erl
@@ -32,6 +32,7 @@
detach_link/1,
send_msg/2,
accept_msg/2,
+ settle_msg/3,
flow_link_credit/3,
flow_link_credit/4,
echo/1,
@@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session,
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
-accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
+accept_msg(LinkRef, Msg) ->
+ settle_msg(LinkRef, Msg, accepted).
+
+%% @doc Settle a message on a the link referred to be the 'LinkRef' using
+%% the chosen delivery state.
+-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
+ amqp10_client_types:delivery_state()) -> ok.
+settle_msg(#link_ref{role = receiver,
+ session = Session}, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
- DeliveryId, true, accepted).
-
+ DeliveryId, true, Settlement).
%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl
index 38627f80c5..becbd75861 100644
--- a/deps/amqp10_client/src/amqp10_client.hrl
+++ b/deps/amqp10_client/src/amqp10_client.hrl
@@ -20,5 +20,6 @@
-define(DBG(F, A), ok).
-endif.
--record(link_ref, {role :: sender | receiver, session :: pid(),
+-record(link_ref, {role :: sender | receiver,
+ session :: pid(),
link_handle :: non_neg_integer()}).
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
index 48bc4ba932..82838a9ba1 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
@@ -14,14 +14,11 @@
-define(EXCHANGE_SUB_LIFETIME, "delete-on-close").
-define(DEFAULT_OUTCOME, #'v1_0.released'{}).
--define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
- ?V_1_0_SYMBOL_REJECTED,
- ?V_1_0_SYMBOL_RELEASED]).
-
-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
+-define(SUPPORTED_OUTCOMES, ?OUTCOMES).
outcomes(Source) ->
{DefaultOutcome, Outcomes} =
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
index 71bba9ce5d..8c69effe84 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
@@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome,
#'v1_0.accepted'{} ->
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = false};
+ %% we don't care if the client modified the
+ %% so just treat it as accepted.
+ %% Some clients send modified instead of accepted
+ %% when e.g. a client
+ %% side message TTL expires.
+ #'v1_0.modified'{} ->
+ #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false};
#'v1_0.rejected'{} ->
#'basic.reject'{delivery_tag = DeliveryTag,
requeue = false};
diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
index aedd4873eb..fc3f7d6f1f 100644
--- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
@@ -11,6 +11,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-compile(nowarn_export_all).
-compile(export_all).
all() ->
@@ -22,6 +23,7 @@ all() ->
groups() ->
[
{tests, [], [
+ reliable_send_receive_with_outcomes,
roundtrip_quorum_queue_with_drain,
message_headers_conversion
]},
@@ -68,6 +70,75 @@ end_per_testcase(Testcase, Config) ->
%%% TESTS
%%%
+reliable_send_receive_with_outcomes(Config) ->
+ Outcomes = [accepted,
+ modified,
+ rejected,
+ released],
+ [begin
+ ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
+ reliable_send_receive(Config, Outcome)
+ end || Outcome <- Outcomes],
+ ok.
+
+reliable_send_receive(Config, Outcome) ->
+ Container = atom_to_binary(?FUNCTION_NAME, utf8),
+ OutcomeBin = atom_to_binary(Outcome, utf8),
+ QName = <<Container/binary, OutcomeBin/binary>>,
+ %% declare a quorum queue
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = true,
+ arguments = [{<<"x-queue-type">>,
+ longstr, <<"quorum">>}]}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ %% reliable send and consume
+ Host = ?config(rmq_hostname, Config),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ Address = <<"/amq/queue/", QName/binary>>,
+
+ OpnConf = #{address => Host,
+ port => Port,
+ container_id => Container,
+ sasl => {plain, <<"guest">>, <<"guest">>}},
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session(Connection),
+ SenderLinkName = <<"test-sender">>,
+ {ok, Sender} = amqp10_client:attach_sender_link(Session,
+ SenderLinkName,
+ Address),
+ ok = wait_for_credit(Sender),
+ DTag1 = <<"dtag-1">>,
+ %% create an unsettled message,
+ %% link will be in "mixed" mode by default
+ Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
+ ok = amqp10_client:send_msg(Sender, Msg1),
+ ok = wait_for_settlement(DTag1),
+
+ ok = amqp10_client:detach_link(Sender),
+ ok = amqp10_client:close_connection(Connection),
+ flush("post sender close"),
+
+ {ok, Connection2} = amqp10_client:open_connection(OpnConf),
+ {ok, Session2} = amqp10_client:begin_session(Connection2),
+ ReceiverLinkName = <<"test-receiver">>,
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session2,
+ ReceiverLinkName,
+ Address,
+ unsettled),
+ {ok, Msg} = amqp10_client:get_msg(Receiver),
+
+ ct:pal("got ~p", [amqp10_msg:body(Msg)]),
+
+ ok = amqp10_client:settle_msg(Receiver, Msg, Outcome),
+
+ flush("post accept"),
+
+ ok = amqp10_client:detach_link(Receiver),
+ ok = amqp10_client:close_connection(Connection2),
+
+ ok.
+
roundtrip_quorum_queue_with_drain(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
@@ -83,7 +154,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
-
+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
@@ -141,18 +212,18 @@ message_headers_conversion(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
-
+
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]),
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]),
-
+
OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
-
+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
-
+
amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
@@ -173,7 +244,7 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
wait_for_accepts(1),
{ok, Headers} = amqp091_get_msg_headers(Ch, QName),
-
+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)).
@@ -252,22 +323,35 @@ open_and_close_connection(OpnConf) ->
ok = amqp10_client:close_connection(Connection).
% before we can send messages we have to wait for credit from the server
-wait_for_credit(Sender) ->
+wait_for_credit(Sender) ->
+ receive
+ {amqp10_event, {link, Sender, credited}} ->
+ flush(?FUNCTION_NAME),
+ ok
+ after 5000 ->
+ flush("wait_for_credit timed out"),
+ ct:fail(credited_timeout)
+ end.
+
+wait_for_settlement(Tag) ->
receive
- {amqp10_event, {link, Sender, credited}} ->
+ {amqp10_disposition, {accepted, Tag}} ->
+ flush(?FUNCTION_NAME),
ok
after 5000 ->
- flush("Credit timed out"),
- exit(credited_timeout)
+ flush("wait_for_settlement timed out"),
+ ct:fail(credited_timeout)
end.
wait_for_accepts(0) -> ok;
-wait_for_accepts(N) ->
- receive
- {amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1)
- after 250 ->
- ok
+wait_for_accepts(N) ->
+ receive
+ {amqp10_disposition,{accepted,_}} ->
+ wait_for_accepts(N -1)
+ after 250 ->
+ ok
end.
+
delete_queue(Config, QName) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
index 6f083c2309..84ed881cc2 100755
--- a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
+++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
@@ -190,6 +190,7 @@ module Test =
let q = "roundtrip-091-q"
let corr = "corrlation"
let sender = SenderLink(c.Session, q + "-sender" , q)
+
new Message("hi"B, Header = Header(),
Properties = new Properties(CorrelationId = corr))
|> sender.Send