diff options
author | Michael Klishin <klishinm@vmware.com> | 2022-10-25 15:10:41 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-25 15:10:41 +0400 |
commit | 2b404b579b58f9dcd9d309f149e62cf52d14480b (patch) | |
tree | 2d3834da036ea515523345e8cf830d33e09371dc | |
parent | 6d64d2fc2ca94d55c67d120a6be2a2357a2094e8 (diff) | |
parent | 6a42c8a34a02326b5fc650d1f97033c867e3a760 (diff) | |
download | rabbitmq-server-git-2b404b579b58f9dcd9d309f149e62cf52d14480b.tar.gz |
Merge pull request #6241 from rabbitmq/mergify/bp/v3.11.x/pr-6228
AMQP 1.0: Support the modified outcome (backport #6228)
6 files changed, 122 insertions, 23 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 785f44f005..a848818554 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -32,6 +32,7 @@ detach_link/1, send_msg/2, accept_msg/2, + settle_msg/3, flow_link_credit/3, flow_link_credit/4, echo/1, @@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session, %% @doc Accept a message on a the link referred to be the 'LinkRef'. -spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok. -accept_msg(#link_ref{role = receiver, session = Session}, Msg) -> +accept_msg(LinkRef, Msg) -> + settle_msg(LinkRef, Msg, accepted). + +%% @doc Settle a message on a the link referred to be the 'LinkRef' using +%% the chosen delivery state. +-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(), + amqp10_client_types:delivery_state()) -> ok. +settle_msg(#link_ref{role = receiver, + session = Session}, Msg, Settlement) -> DeliveryId = amqp10_msg:delivery_id(Msg), amqp10_client_session:disposition(Session, receiver, DeliveryId, - DeliveryId, true, accepted). - + DeliveryId, true, Settlement). %% @doc Get a single message from a link. %% Flows a single link credit then awaits delivery or timeout. -spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}. diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl index 38627f80c5..becbd75861 100644 --- a/deps/amqp10_client/src/amqp10_client.hrl +++ b/deps/amqp10_client/src/amqp10_client.hrl @@ -20,5 +20,6 @@ -define(DBG(F, A), ok). -endif. --record(link_ref, {role :: sender | receiver, session :: pid(), +-record(link_ref, {role :: sender | receiver, + session :: pid(), link_handle :: non_neg_integer()}). diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl index 48bc4ba932..82838a9ba1 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl @@ -14,14 +14,11 @@ -define(EXCHANGE_SUB_LIFETIME, "delete-on-close"). -define(DEFAULT_OUTCOME, #'v1_0.released'{}). --define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, - ?V_1_0_SYMBOL_REJECTED, - ?V_1_0_SYMBOL_RELEASED]). - -define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, ?V_1_0_SYMBOL_REJECTED, ?V_1_0_SYMBOL_RELEASED, ?V_1_0_SYMBOL_MODIFIED]). +-define(SUPPORTED_OUTCOMES, ?OUTCOMES). outcomes(Source) -> {DefaultOutcome, Outcomes} = diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl index 71bba9ce5d..8c69effe84 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl @@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome, #'v1_0.accepted'{} -> #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}; + %% we don't care if the client modified the + %% so just treat it as accepted. + %% Some clients send modified instead of accepted + %% when e.g. a client + %% side message TTL expires. + #'v1_0.modified'{} -> + #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}; #'v1_0.rejected'{} -> #'basic.reject'{delivery_tag = DeliveryTag, requeue = false}; diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index aedd4873eb..fc3f7d6f1f 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-compile(nowarn_export_all). -compile(export_all). all() -> @@ -22,6 +23,7 @@ all() -> groups() -> [ {tests, [], [ + reliable_send_receive_with_outcomes, roundtrip_quorum_queue_with_drain, message_headers_conversion ]}, @@ -68,6 +70,75 @@ end_per_testcase(Testcase, Config) -> %%% TESTS %%% +reliable_send_receive_with_outcomes(Config) -> + Outcomes = [accepted, + modified, + rejected, + released], + [begin + ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]), + reliable_send_receive(Config, Outcome) + end || Outcome <- Outcomes], + ok. + +reliable_send_receive(Config, Outcome) -> + Container = atom_to_binary(?FUNCTION_NAME, utf8), + OutcomeBin = atom_to_binary(Outcome, utf8), + QName = <<Container/binary, OutcomeBin/binary>>, + %% declare a quorum queue + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = [{<<"x-queue-type">>, + longstr, <<"quorum">>}]}), + rabbit_ct_client_helpers:close_channel(Ch), + %% reliable send and consume + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Address = <<"/amq/queue/", QName/binary>>, + + OpnConf = #{address => Host, + port => Port, + container_id => Container, + sasl => {plain, <<"guest">>, <<"guest">>}}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + SenderLinkName = <<"test-sender">>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, + SenderLinkName, + Address), + ok = wait_for_credit(Sender), + DTag1 = <<"dtag-1">>, + %% create an unsettled message, + %% link will be in "mixed" mode by default + Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_settlement(DTag1), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:close_connection(Connection), + flush("post sender close"), + + {ok, Connection2} = amqp10_client:open_connection(OpnConf), + {ok, Session2} = amqp10_client:begin_session(Connection2), + ReceiverLinkName = <<"test-receiver">>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session2, + ReceiverLinkName, + Address, + unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + + ct:pal("got ~p", [amqp10_msg:body(Msg)]), + + ok = amqp10_client:settle_msg(Receiver, Msg, Outcome), + + flush("post accept"), + + ok = amqp10_client:detach_link(Receiver), + ok = amqp10_client:close_connection(Connection2), + + ok. + roundtrip_quorum_queue_with_drain(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -83,7 +154,7 @@ roundtrip_quorum_queue_with_drain(Config) -> port => Port, container_id => atom_to_binary(?FUNCTION_NAME, utf8), sasl => {plain, <<"guest">>, <<"guest">>}}, - + {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session(Connection), SenderLinkName = <<"test-sender">>, @@ -141,18 +212,18 @@ message_headers_conversion(Config) -> amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), - + rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]), rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]), - + OpnConf = #{address => Host, port => Port, container_id => atom_to_binary(?FUNCTION_NAME, utf8), sasl => {plain, <<"guest">>, <<"guest">>}}, - + {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session(Connection), - + amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), @@ -173,7 +244,7 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> wait_for_accepts(1), {ok, Headers} = amqp091_get_msg_headers(Ch, QName), - + ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)), ?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)), ?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)). @@ -252,22 +323,35 @@ open_and_close_connection(OpnConf) -> ok = amqp10_client:close_connection(Connection). % before we can send messages we have to wait for credit from the server -wait_for_credit(Sender) -> +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_credit timed out"), + ct:fail(credited_timeout) + end. + +wait_for_settlement(Tag) -> receive - {amqp10_event, {link, Sender, credited}} -> + {amqp10_disposition, {accepted, Tag}} -> + flush(?FUNCTION_NAME), ok after 5000 -> - flush("Credit timed out"), - exit(credited_timeout) + flush("wait_for_settlement timed out"), + ct:fail(credited_timeout) end. wait_for_accepts(0) -> ok; -wait_for_accepts(N) -> - receive - {amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1) - after 250 -> - ok +wait_for_accepts(N) -> + receive + {amqp10_disposition,{accepted,_}} -> + wait_for_accepts(N -1) + after 250 -> + ok end. + delete_queue(Config, QName) -> Ch = rabbit_ct_client_helpers:open_channel(Config, 0), _ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs index 6f083c2309..84ed881cc2 100755 --- a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs @@ -190,6 +190,7 @@ module Test = let q = "roundtrip-091-q" let corr = "corrlation" let sender = SenderLink(c.Session, q + "-sender" , q) + new Message("hi"B, Header = Header(), Properties = new Properties(CorrelationId = corr)) |> sender.Send |