diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2023-02-01 13:13:59 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2023-02-01 15:34:29 +0000 |
commit | 3bb32737ee9229612dcb776929cff795aff69151 (patch) | |
tree | 8acab1ade1c07a3ab897f3c0f35ff8cff7b41835 | |
parent | 149f3e5d34b418dce2852c61df27fc80b4a3c9de (diff) | |
download | rabbitmq-server-git-3bb32737ee9229612dcb776929cff795aff69151.tar.gz |
Fix channel crash when draining AMQP 1.0 credits from classic queue
Classic queues used a different format for the `{send_drained, _}`
queue type action which was missed originally. This change handles both
formats in the channel for backwards compatibility
as well as changes classic queues to conform to the same format when
sending the queue event.
Whilst adding tests for this in the amqp10 plugin another issue around
the amqp10_client and filters was discovered and this commit also includes
improvements in this area. Such as more leninet support of source filters.
-rw-r--r-- | deps/amqp10_client/src/amqp10_client.erl | 11 | ||||
-rw-r--r-- | deps/amqp10_client/src/amqp10_client_session.erl | 80 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 30 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue.erl | 9 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_consumers.erl | 10 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl | 16 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl | 41 |
7 files changed, 137 insertions, 60 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 78d065dd7b..d15c8df01b 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -269,7 +269,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) -> +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) + when is_pid(Session) andalso + is_binary(Name) andalso + is_binary(Source) andalso + (SettleMode == unsettled orelse + SettleMode == settled orelse + SettleMode == mixed) andalso + is_atom(Durability) andalso + is_map(Filter) andalso + is_map(Properties) -> AttachArgs = #{name => Name, role => {receiver, #{address => Source, durable => Durability}, self()}, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 0472b881ec..d6eec4a76a 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -648,41 +648,51 @@ translate_terminus_durability(none) -> 0; translate_terminus_durability(configuration) -> 1; translate_terminus_durability(unsettled_state) -> 2. -translate_filters(Filters) when is_map(Filters) andalso map_size(Filters) =< 0 -> undefined; -translate_filters(Filters) when is_map(Filters) -> { - map, - maps:fold( - fun(<<"apache.org:legacy-amqp-direct-binding:string">> = K, V, Acc) when is_binary(V) -> - [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]; - (<<"apache.org:legacy-amqp-topic-binding:string">> = K, V, Acc) when is_binary(V) -> - [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]; - (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> - [{{symbol, K}, {described, {symbol, K}, translate_legacy_amqp_headers_binding(V)}} | Acc]; - (<<"apache.org:no-local-filter:list">> = K, V, Acc) when is_list(V) -> - [{{symbol, K}, {described, {symbol, K}, lists:map(fun(Id) -> {utf8, Id} end, V)}} | Acc]; - (<<"apache.org:selector-filter:string">> = K, V, Acc) when is_binary(V) -> - [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc] - end, - [], - Filters) -}. +translate_filters(Filters) + when is_map(Filters) andalso + map_size(Filters) == 0 -> + undefined; +translate_filters(Filters) + when is_map(Filters) -> + {map, + maps:fold( + fun + (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) -> + %% special case conversion + Key = sym(K), + [{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc]; + (K, V, Acc) when is_binary(K) -> + %% try treat any filter value generically + Key = sym(K), + Value = filter_value_type(V), + [{Key, {described, Key, Value}} | Acc] + end, [], Filters)}. + +filter_value_type(V) when is_binary(V) -> + %% this is clearly not always correct + {utf8, V}; +filter_value_type(V) + when is_integer(V) andalso V >= 0 -> + {uint, V}; +filter_value_type(VList) when is_list(VList) -> + [filter_value_type(V) || V <- VList]; +filter_value_type({T, _} = V) when is_atom(T) -> + %% looks like an already tagged type, just pass it through + V. % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html -translate_legacy_amqp_headers_binding(LegacyHeaders) -> { - map, - maps:fold( - fun(<<"x-match">> = K, <<"any">> = V, Acc) -> - [{{utf8, K}, {utf8, V}} | Acc]; - (<<"x-match">> = K, <<"all">> = V, Acc) -> - [{{utf8, K}, {utf8, V}} | Acc]; - (<<"x-",_/binary>>, _, Acc) -> - Acc; - (K, V, Acc) -> - [{{utf8, K}, {utf8, V}} | Acc] - end, - [], - LegacyHeaders) -}. +translate_legacy_amqp_headers_binding(LegacyHeaders) -> + {map, + maps:fold( + fun(<<"x-match">> = K, <<"any">> = V, Acc) -> + [{{utf8, K}, {utf8, V}} | Acc]; + (<<"x-match">> = K, <<"all">> = V, Acc) -> + [{{utf8, K}, {utf8, V}} | Acc]; + (<<"x-", _/binary>>, _, Acc) -> + Acc; + (K, V, Acc) -> + [{{utf8, K}, filter_value_type(V)} | Acc] + end, [], LegacyHeaders)}. send_detach(Send, {detach, OutHandle}, _From, State = #state{links = Links}) -> case Links of @@ -1011,8 +1021,10 @@ wrap_map_value(V) when is_atom(V) -> utf8(atom_to_list(V)). utf8(V) -> amqp10_client_types:utf8(V). + +sym(B) when is_binary(B) -> {symbol, B}; sym(B) when is_list(B) -> {symbol, list_to_binary(B)}; -sym(B) when is_binary(B) -> {symbol, B}. +sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 2d5ccdcad4..1d81d98cd9 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2831,16 +2831,7 @@ evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, handle_queue_actions(Actions, #ch{} = State0) -> WriterPid = State0#ch.cfg#conf.writer_pid, lists:foldl( - fun ({send_credit_reply, Avail}, S0) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.credit_ok'{available = Avail}), - S0; - ({send_drained, {CTag, Credit}}, S0) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.credit_drained'{consumer_tag = CTag, - credit_drained = Credit}), - S0; + fun ({settled, QRef, MsgSeqNos}, S0) -> confirm(MsgSeqNos, QRef, S0); ({rejected, _QRef, MsgSeqNos}, S0) -> @@ -2865,9 +2856,28 @@ handle_queue_actions(Actions, #ch{} = State0) -> S0; ({unblock, QName}, S0) -> credit_flow:unblock(QName), + S0; + ({send_credit_reply, Avail}, S0) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.credit_ok'{available = Avail}), + S0; + ({send_drained, {CTag, Credit}}, S0) -> + ok = send_drained_to_writer(WriterPid, CTag, Credit), + S0; + ({send_drained, CTagCredits}, S0) when is_list(CTagCredits) -> + %% this is the backwards compatible option that classic queues + %% used to send, this can be removed in 4.0 + [ok = send_drained_to_writer(WriterPid, CTag, Credit) + || {CTag, Credit} <- CTagCredits], S0 end, State0, Actions). +send_drained_to_writer(WriterPid, CTag, Credit) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.credit_drained'{consumer_tag = CTag, + credit_drained = Credit}). + maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) -> State0; maybe_increase_global_publishers(State0) -> diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index a5b92bda94..abd5fd9d32 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -551,9 +551,14 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) -> Evt = {queue_event, QName, Deliver}, gen_server:cast(Pid, Evt). -send_drained(Pid, QName, CTagCredits) -> +send_drained(Pid, QName, CTagCredits) when is_list(CTagCredits) -> + [_ = gen_server:cast(Pid, {queue_event, QName, + {send_drained, CTagCredit}}) + || CTagCredit <- CTagCredits], + ok; +send_drained(Pid, QName, CTagCredit) when is_tuple(CTagCredit) -> gen_server:cast(Pid, {queue_event, QName, - {send_drained, CTagCredits}}). + {send_drained, CTagCredit}}). send_credit_reply(Pid, QName, Len) when is_integer(Len) -> gen_server:cast(Pid, {queue_event, QName, diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl index 4bcb897f98..c89f7b85bc 100644 --- a/deps/rabbit/src/rabbit_queue_consumers.erl +++ b/deps/rabbit/src/rabbit_queue_consumers.erl @@ -193,9 +193,9 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> end. -spec send_drained(rabbit_amqqueue:name()) -> 'ok'. - -send_drained(QName) -> [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()], - ok. +send_drained(QName) -> + [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()], + ok. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), rabbit_amqqueue:name(), state(), boolean(), @@ -528,7 +528,7 @@ send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) -> case rabbit_limiter:drained(Limiter) of {[], Limiter} -> C; {CTagCredits, Limiter2} -> - rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits), + ok = rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits), C#cr{limiter = Limiter2} end. @@ -536,7 +536,7 @@ credit_and_drain(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}, CTag, Credit, Mode, IsEmpty) -> case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> - rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]), + ok = rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]), C#cr{limiter = Limiter1}; {false, Limiter1} -> C#cr{limiter = Limiter1} end. diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl index 4110202fc3..be42a03010 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl @@ -250,7 +250,7 @@ transferred(DeliveryTag, Channel, source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) -> Key = {symbol, <<"rabbitmq:stream-offset-spec">>}, - case lists:keyfind(Key, 1, KVList) of + case keyfind_unpack_described(Key, KVList) of {_, {timestamp, Ts}} -> [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps {_, {utf8, Spec}} -> @@ -262,3 +262,17 @@ source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) -> end; source_filters_to_consumer_args(_Source) -> []. + +keyfind_unpack_described(Key, KvList) -> + %% filterset values _should_ be described values + %% they aren't always however for historical reasons so we need this bit of + %% code to return a plain value for the given filter key + case lists:keyfind(Key, 1, KvList) of + {Key, {described, Key, Value}} -> + {Key, Value}; + {Key, _} = Kv -> + Kv; + false -> + false + end. + diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 66037564e2..6870b099fb 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -24,7 +24,9 @@ groups() -> [ {tests, [], [ reliable_send_receive_with_outcomes, + roundtrip_classic_queue_with_drain, roundtrip_quorum_queue_with_drain, + roundtrip_stream_queue_with_drain, message_headers_conversion ]}, {metrics, [], [ @@ -149,16 +151,28 @@ reliable_send_receive(Config, Outcome) -> ok. +roundtrip_classic_queue_with_drain(Config) -> + QName = atom_to_binary(?FUNCTION_NAME, utf8), + roundtrip_queue_with_drain(Config, <<"classic">>, QName). + roundtrip_quorum_queue_with_drain(Config) -> + QName = atom_to_binary(?FUNCTION_NAME, utf8), + roundtrip_queue_with_drain(Config, <<"quorum">>, QName). + +roundtrip_stream_queue_with_drain(Config) -> + QName = atom_to_binary(?FUNCTION_NAME, utf8), + roundtrip_queue_with_drain(Config, <<"stream">>, QName). + +roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - QName = atom_to_binary(?FUNCTION_NAME, utf8), Address = <<"/amq/queue/", QName/binary>>, - %% declare a quorum queue + %% declare a queue Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + Args = [{<<"x-queue-type">>, longstr, QueueType}], amqp_channel:call(Ch, #'queue.declare'{queue = QName, durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + arguments = Args}), % create a configuration map OpnConf = #{address => Host, port => Port, @@ -182,16 +196,29 @@ roundtrip_quorum_queue_with_drain(Config) -> flush("pre-receive"), % create a receiver link - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, - <<"test-receiver">>, - Address), + + TerminusDurability = none, + Filter = case QueueType of + <<"stream">> -> + #{<<"rabbitmq:stream-offset-spec">> => <<"first">>}; + _ -> + #{} + end, + Properties = #{}, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, + Address, unsettled, + TerminusDurability, + Filter, Properties), % grant credit and drain ok = amqp10_client:flow_link_credit(Receiver, 1, never, true), % wait for a delivery receive - {amqp10_msg, Receiver, _InMsg} -> ok + {amqp10_msg, Receiver, InMsg} -> + ok = amqp10_client:accept_msg(Receiver, InMsg), + wait_for_accepts(1), + ok after 2000 -> exit(delivery_timeout) end, |