diff options
author | Michael Klishin <klishinm@vmware.com> | 2023-02-01 21:41:19 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-01 21:41:19 -0500 |
commit | 70094e6c1f2cf18e9b51f51d2468c1aaaf4d32e1 (patch) | |
tree | f5629177de5940b661a49c28ff775bd5ad6752bd | |
parent | 535a0be564491c516e09264cb98f7e6d92507a4a (diff) | |
parent | 3bb32737ee9229612dcb776929cff795aff69151 (diff) | |
download | rabbitmq-server-git-70094e6c1f2cf18e9b51f51d2468c1aaaf4d32e1.tar.gz |
Merge pull request #7141 from rabbitmq/send-drained-bug-fix
Fix channel crash when draining AMQP 1.0 credits from classic queue
-rw-r--r-- | deps/amqp10_client/src/amqp10_client.erl | 11 | ||||
-rw-r--r-- | deps/amqp10_client/src/amqp10_client_session.erl | 80 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 30 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue.erl | 9 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_consumers.erl | 10 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl | 16 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl | 41 |
7 files changed, 137 insertions, 60 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 78d065dd7b..d15c8df01b 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -269,7 +269,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) -> +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) + when is_pid(Session) andalso + is_binary(Name) andalso + is_binary(Source) andalso + (SettleMode == unsettled orelse + SettleMode == settled orelse + SettleMode == mixed) andalso + is_atom(Durability) andalso + is_map(Filter) andalso + is_map(Properties) -> AttachArgs = #{name => Name, role => {receiver, #{address => Source, durable => Durability}, self()}, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 0472b881ec..d6eec4a76a 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -648,41 +648,51 @@ translate_terminus_durability(none) -> 0; translate_terminus_durability(configuration) -> 1; translate_terminus_durability(unsettled_state) -> 2. -translate_filters(Filters) when is_map(Filters) andalso map_size(Filters) =< 0 -> undefined; -translate_filters(Filters) when is_map(Filters) -> { - map, - maps:fold( - fun(<<"apache.org:legacy-amqp-direct-binding:string">> = K, V, Acc) when is_binary(V) -> - [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]; - (<<"apache.org:legacy-amqp-topic-binding:string">> = K, V, Acc) when is_binary(V) -> - [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]; - (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> - [{{symbol, K}, {described, {symbol, K}, translate_legacy_amqp_headers_binding(V)}} | Acc]; - (<<"apache.org:no-local-filter:list">> = K, V, Acc) when is_list(V) -> - [{{symbol, K}, {described, {symbol, K}, lists:map(fun(Id) -> {utf8, Id} end, V)}} | Acc]; - (<<"apache.org:selector-filter:string">> = K, V, Acc) when is_binary(V) -> - [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc] - end, - [], - Filters) -}. +translate_filters(Filters) + when is_map(Filters) andalso + map_size(Filters) == 0 -> + undefined; +translate_filters(Filters) + when is_map(Filters) -> + {map, + maps:fold( + fun + (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> + %% special case conversion + Key = sym(K), + [{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc]; + (K, V, Acc) when is_binary(K) -> + %% try treat any filter value generically + Key = sym(K), + Value = filter_value_type(V), + [{Key, {described, Key, Value}} | Acc] + end, [], Filters)}. + +filter_value_type(V) when is_binary(V) -> + %% this is clearly not always correct + {utf8, V}; +filter_value_type(V) + when is_integer(V) andalso V >= 0 -> + {uint, V}; +filter_value_type(VList) when is_list(VList) -> + [filter_value_type(V) || V <- VList]; +filter_value_type({T, _} = V) when is_atom(T) -> + %% looks like an already tagged type, just pass it through + V. % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html -translate_legacy_amqp_headers_binding(LegacyHeaders) -> { - map, - maps:fold( - fun(<<"x-match">> = K, <<"any">> = V, Acc) -> - [{{utf8, K}, {utf8, V}} | Acc]; - (<<"x-match">> = K, <<"all">> = V, Acc) -> - [{{utf8, K}, {utf8, V}} | Acc]; - (<<"x-",_/binary>>, _, Acc) -> - Acc; - (K, V, Acc) -> - [{{utf8, K}, {utf8, V}} | Acc] - end, - [], - LegacyHeaders) -}. +translate_legacy_amqp_headers_binding(LegacyHeaders) -> + {map, + maps:fold( + fun(<<"x-match">> = K, <<"any">> = V, Acc) -> + [{{utf8, K}, {utf8, V}} | Acc]; + (<<"x-match">> = K, <<"all">> = V, Acc) -> + [{{utf8, K}, {utf8, V}} | Acc]; + (<<"x-", _/binary>>, _, Acc) -> + Acc; + (K, V, Acc) -> + [{{utf8, K}, filter_value_type(V)} | Acc] + end, [], LegacyHeaders)}. send_detach(Send, {detach, OutHandle}, _From, State = #state{links = Links}) -> case Links of @@ -1011,8 +1021,10 @@ wrap_map_value(V) when is_atom(V) -> utf8(atom_to_list(V)). utf8(V) -> amqp10_client_types:utf8(V). + +sym(B) when is_binary(B) -> {symbol, B}; sym(B) when is_list(B) -> {symbol, list_to_binary(B)}; -sym(B) when is_binary(B) -> {symbol, B}. +sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 2d5ccdcad4..1d81d98cd9 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2831,16 +2831,7 @@ evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, handle_queue_actions(Actions, #ch{} = State0) -> WriterPid = State0#ch.cfg#conf.writer_pid, lists:foldl( - fun ({send_credit_reply, Avail}, S0) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.credit_ok'{available = Avail}), - S0; - ({send_drained, {CTag, Credit}}, S0) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.credit_drained'{consumer_tag = CTag, - credit_drained = Credit}), - S0; + fun ({settled, QRef, MsgSeqNos}, S0) -> confirm(MsgSeqNos, QRef, S0); ({rejected, _QRef, MsgSeqNos}, S0) -> @@ -2865,9 +2856,28 @@ handle_queue_actions(Actions, #ch{} = State0) -> S0; ({unblock, QName}, S0) -> credit_flow:unblock(QName), + S0; + ({send_credit_reply, Avail}, S0) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.credit_ok'{available = Avail}), + S0; + ({send_drained, {CTag, Credit}}, S0) -> + ok = send_drained_to_writer(WriterPid, CTag, Credit), + S0; + ({send_drained, CTagCredits}, S0) when is_list(CTagCredits) -> + %% this is the backwards compatible option that classic queues + %% used to send, this can be removed in 4.0 + [ok = send_drained_to_writer(WriterPid, CTag, Credit) + || {CTag, Credit} <- CTagCredits], S0 end, State0, Actions). +send_drained_to_writer(WriterPid, CTag, Credit) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.credit_drained'{consumer_tag = CTag, + credit_drained = Credit}). + maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) -> State0; maybe_increase_global_publishers(State0) -> diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index a5b92bda94..abd5fd9d32 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -551,9 +551,14 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) -> Evt = {queue_event, QName, Deliver}, gen_server:cast(Pid, Evt). -send_drained(Pid, QName, CTagCredits) -> +send_drained(Pid, QName, CTagCredits) when is_list(CTagCredits) -> + [_ = gen_server:cast(Pid, {queue_event, QName, + {send_drained, CTagCredit}}) + || CTagCredit <- CTagCredits], + ok; +send_drained(Pid, QName, CTagCredit) when is_tuple(CTagCredit) -> gen_server:cast(Pid, {queue_event, QName, - {send_drained, CTagCredits}}). + {send_drained, CTagCredit}}). send_credit_reply(Pid, QName, Len) when is_integer(Len) -> gen_server:cast(Pid, {queue_event, QName, diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl index 4bcb897f98..c89f7b85bc 100644 --- a/deps/rabbit/src/rabbit_queue_consumers.erl +++ b/deps/rabbit/src/rabbit_queue_consumers.erl @@ -193,9 +193,9 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> end. -spec send_drained(rabbit_amqqueue:name()) -> 'ok'. - -send_drained(QName) -> [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()], - ok. +send_drained(QName) -> + [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()], + ok. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), rabbit_amqqueue:name(), state(), boolean(), @@ -528,7 +528,7 @@ send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) -> case rabbit_limiter:drained(Limiter) of {[], Limiter} -> C; {CTagCredits, Limiter2} -> - rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits), + ok = rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits), C#cr{limiter = Limiter2} end. @@ -536,7 +536,7 @@ credit_and_drain(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}, CTag, Credit, Mode, IsEmpty) -> case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> - rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]), + ok = rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]), C#cr{limiter = Limiter1}; {false, Limiter1} -> C#cr{limiter = Limiter1} end. diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl index 4110202fc3..be42a03010 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl @@ -250,7 +250,7 @@ transferred(DeliveryTag, Channel, source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) -> Key = {symbol, <<"rabbitmq:stream-offset-spec">>}, - case lists:keyfind(Key, 1, KVList) of + case keyfind_unpack_described(Key, KVList) of {_, {timestamp, Ts}} -> [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps {_, {utf8, Spec}} -> @@ -262,3 +262,17 @@ source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) -> end; source_filters_to_consumer_args(_Source) -> []. + +keyfind_unpack_described(Key, KvList) -> + %% filterset values _should_ be described values + %% they aren't always however for historical reasons so we need this bit of + %% code to return a plain value for the given filter key + case lists:keyfind(Key, 1, KvList) of + {Key, {described, Key, Value}} -> + {Key, Value}; + {Key, _} = Kv -> + Kv; + false -> + false + end. + diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 66037564e2..6870b099fb 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -24,7 +24,9 @@ groups() -> [ {tests, [], [ reliable_send_receive_with_outcomes, + roundtrip_classic_queue_with_drain, roundtrip_quorum_queue_with_drain, + roundtrip_stream_queue_with_drain, message_headers_conversion ]}, {metrics, [], [ @@ -149,16 +151,28 @@ reliable_send_receive(Config, Outcome) -> ok. +roundtrip_classic_queue_with_drain(Config) -> + QName = atom_to_binary(?FUNCTION_NAME, utf8), + roundtrip_queue_with_drain(Config, <<"classic">>, QName). + roundtrip_quorum_queue_with_drain(Config) -> + QName = atom_to_binary(?FUNCTION_NAME, utf8), + roundtrip_queue_with_drain(Config, <<"quorum">>, QName). + +roundtrip_stream_queue_with_drain(Config) -> + QName = atom_to_binary(?FUNCTION_NAME, utf8), + roundtrip_queue_with_drain(Config, <<"stream">>, QName). + +roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - QName = atom_to_binary(?FUNCTION_NAME, utf8), Address = <<"/amq/queue/", QName/binary>>, - %% declare a quorum queue + %% declare a queue Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + Args = [{<<"x-queue-type">>, longstr, QueueType}], amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + arguments = Args}), % create a configuration map OpnConf = #{address => Host, port => Port, @@ -182,16 +196,29 @@ roundtrip_quorum_queue_with_drain(Config) -> flush("pre-receive"), % create a receiver link - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, - <<"test-receiver">>, - Address), + + TerminusDurability = none, + Filter = case QueueType of + <<"stream">> -> + #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}; + _ -> + #{} + end, + Properties = #{}, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, + Address, unsettled, + TerminusDurability, + Filter, Properties), % grant credit and drain ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), % wait for a delivery receive - {amqp10_msg, Receiver, _InMsg} -> ok + {amqp10_msg, Receiver, InMsg} -> + ok = amqp10_client:accept_msg(Receiver, InMsg), + wait_for_accepts(1), + ok after 2000 -> exit(delivery_timeout) end, |