summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2023-02-01 13:13:59 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2023-02-01 15:34:29 +0000
commit3bb32737ee9229612dcb776929cff795aff69151 (patch)
tree8acab1ade1c07a3ab897f3c0f35ff8cff7b41835
parent149f3e5d34b418dce2852c61df27fc80b4a3c9de (diff)
downloadrabbitmq-server-git-3bb32737ee9229612dcb776929cff795aff69151.tar.gz
Fix channel crash when draining AMQP 1.0 credits from classic queue
Classic queues used a different format for the `{send_drained, _}` queue type action which was missed originally. This change handles both formats in the channel for backwards compatibility as well as changes classic queues to conform to the same format when sending the queue event. Whilst adding tests for this in the amqp10 plugin another issue around the amqp10_client and filters was discovered and this commit also includes improvements in this area. Such as more leninet support of source filters.
-rw-r--r--deps/amqp10_client/src/amqp10_client.erl11
-rw-r--r--deps/amqp10_client/src/amqp10_client_session.erl80
-rw-r--r--deps/rabbit/src/rabbit_channel.erl30
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl9
-rw-r--r--deps/rabbit/src/rabbit_queue_consumers.erl10
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl16
-rw-r--r--deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl41
7 files changed, 137 insertions, 60 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl
index 78d065dd7b..d15c8df01b 100644
--- a/deps/amqp10_client/src/amqp10_client.erl
+++ b/deps/amqp10_client/src/amqp10_client.erl
@@ -269,7 +269,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
snd_settle_mode(), terminus_durability(), filter(),
properties()) ->
{ok, link_ref()}.
-attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
+attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
+ when is_pid(Session) andalso
+ is_binary(Name) andalso
+ is_binary(Source) andalso
+ (SettleMode == unsettled orelse
+ SettleMode == settled orelse
+ SettleMode == mixed) andalso
+ is_atom(Durability) andalso
+ is_map(Filter) andalso
+ is_map(Properties) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source,
durable => Durability}, self()},
diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl
index 0472b881ec..d6eec4a76a 100644
--- a/deps/amqp10_client/src/amqp10_client_session.erl
+++ b/deps/amqp10_client/src/amqp10_client_session.erl
@@ -648,41 +648,51 @@ translate_terminus_durability(none) -> 0;
translate_terminus_durability(configuration) -> 1;
translate_terminus_durability(unsettled_state) -> 2.
-translate_filters(Filters) when is_map(Filters) andalso map_size(Filters) =< 0 -> undefined;
-translate_filters(Filters) when is_map(Filters) -> {
- map,
- maps:fold(
- fun(<<"apache.org:legacy-amqp-direct-binding:string">> = K, V, Acc) when is_binary(V) ->
- [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
- (<<"apache.org:legacy-amqp-topic-binding:string">> = K, V, Acc) when is_binary(V) ->
- [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
- (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
- [{{symbol, K}, {described, {symbol, K}, translate_legacy_amqp_headers_binding(V)}} | Acc];
- (<<"apache.org:no-local-filter:list">> = K, V, Acc) when is_list(V) ->
- [{{symbol, K}, {described, {symbol, K}, lists:map(fun(Id) -> {utf8, Id} end, V)}} | Acc];
- (<<"apache.org:selector-filter:string">> = K, V, Acc) when is_binary(V) ->
- [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]
- end,
- [],
- Filters)
-}.
+translate_filters(Filters)
+ when is_map(Filters) andalso
+ map_size(Filters) == 0 ->
+ undefined;
+translate_filters(Filters)
+ when is_map(Filters) ->
+ {map,
+ maps:fold(
+ fun
+ (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
+ %% special case conversion
+ Key = sym(K),
+ [{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
+ (K, V, Acc) when is_binary(K) ->
+ %% try treat any filter value generically
+ Key = sym(K),
+ Value = filter_value_type(V),
+ [{Key, {described, Key, Value}} | Acc]
+ end, [], Filters)}.
+
+filter_value_type(V) when is_binary(V) ->
+ %% this is clearly not always correct
+ {utf8, V};
+filter_value_type(V)
+ when is_integer(V) andalso V >= 0 ->
+ {uint, V};
+filter_value_type(VList) when is_list(VList) ->
+ [filter_value_type(V) || V <- VList];
+filter_value_type({T, _} = V) when is_atom(T) ->
+ %% looks like an already tagged type, just pass it through
+ V.
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
-translate_legacy_amqp_headers_binding(LegacyHeaders) -> {
- map,
- maps:fold(
- fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
- [{{utf8, K}, {utf8, V}} | Acc];
- (<<"x-match">> = K, <<"all">> = V, Acc) ->
- [{{utf8, K}, {utf8, V}} | Acc];
- (<<"x-",_/binary>>, _, Acc) ->
- Acc;
- (K, V, Acc) ->
- [{{utf8, K}, {utf8, V}} | Acc]
- end,
- [],
- LegacyHeaders)
-}.
+translate_legacy_amqp_headers_binding(LegacyHeaders) ->
+ {map,
+ maps:fold(
+ fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
+ [{{utf8, K}, {utf8, V}} | Acc];
+ (<<"x-match">> = K, <<"all">> = V, Acc) ->
+ [{{utf8, K}, {utf8, V}} | Acc];
+ (<<"x-", _/binary>>, _, Acc) ->
+ Acc;
+ (K, V, Acc) ->
+ [{{utf8, K}, filter_value_type(V)} | Acc]
+ end, [], LegacyHeaders)}.
send_detach(Send, {detach, OutHandle}, _From, State = #state{links = Links}) ->
case Links of
@@ -1011,8 +1021,10 @@ wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(V) -> amqp10_client_types:utf8(V).
+
+sym(B) when is_binary(B) -> {symbol, B};
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
-sym(B) when is_binary(B) -> {symbol, B}.
+sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 2d5ccdcad4..1d81d98cd9 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2831,16 +2831,7 @@ evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
handle_queue_actions(Actions, #ch{} = State0) ->
WriterPid = State0#ch.cfg#conf.writer_pid,
lists:foldl(
- fun ({send_credit_reply, Avail}, S0) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.credit_ok'{available = Avail}),
- S0;
- ({send_drained, {CTag, Credit}}, S0) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.credit_drained'{consumer_tag = CTag,
- credit_drained = Credit}),
- S0;
+ fun
({settled, QRef, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QRef, S0);
({rejected, _QRef, MsgSeqNos}, S0) ->
@@ -2865,9 +2856,28 @@ handle_queue_actions(Actions, #ch{} = State0) ->
S0;
({unblock, QName}, S0) ->
credit_flow:unblock(QName),
+ S0;
+ ({send_credit_reply, Avail}, S0) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.credit_ok'{available = Avail}),
+ S0;
+ ({send_drained, {CTag, Credit}}, S0) ->
+ ok = send_drained_to_writer(WriterPid, CTag, Credit),
+ S0;
+ ({send_drained, CTagCredits}, S0) when is_list(CTagCredits) ->
+ %% this is the backwards compatible option that classic queues
+ %% used to send, this can be removed in 4.0
+ [ok = send_drained_to_writer(WriterPid, CTag, Credit)
+ || {CTag, Credit} <- CTagCredits],
S0
end, State0, Actions).
+send_drained_to_writer(WriterPid, CTag, Credit) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.credit_drained'{consumer_tag = CTag,
+ credit_drained = Credit}).
+
maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) ->
State0;
maybe_increase_global_publishers(State0) ->
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index a5b92bda94..abd5fd9d32 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -551,9 +551,14 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
Evt = {queue_event, QName, Deliver},
gen_server:cast(Pid, Evt).
-send_drained(Pid, QName, CTagCredits) ->
+send_drained(Pid, QName, CTagCredits) when is_list(CTagCredits) ->
+ [_ = gen_server:cast(Pid, {queue_event, QName,
+ {send_drained, CTagCredit}})
+ || CTagCredit <- CTagCredits],
+ ok;
+send_drained(Pid, QName, CTagCredit) when is_tuple(CTagCredit) ->
gen_server:cast(Pid, {queue_event, QName,
- {send_drained, CTagCredits}}).
+ {send_drained, CTagCredit}}).
send_credit_reply(Pid, QName, Len) when is_integer(Len) ->
gen_server:cast(Pid, {queue_event, QName,
diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl
index 4bcb897f98..c89f7b85bc 100644
--- a/deps/rabbit/src/rabbit_queue_consumers.erl
+++ b/deps/rabbit/src/rabbit_queue_consumers.erl
@@ -193,9 +193,9 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
end.
-spec send_drained(rabbit_amqqueue:name()) -> 'ok'.
-
-send_drained(QName) -> [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
- ok.
+send_drained(QName) ->
+ [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
+ ok.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
rabbit_amqqueue:name(), state(), boolean(),
@@ -528,7 +528,7 @@ send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
{[], Limiter} -> C;
{CTagCredits, Limiter2} ->
- rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
+ ok = rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
C#cr{limiter = Limiter2}
end.
@@ -536,7 +536,7 @@ credit_and_drain(QName, C = #cr{ch_pid = ChPid, limiter = Limiter},
CTag, Credit, Mode, IsEmpty) ->
case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} ->
- rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
+ ok = rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
C#cr{limiter = Limiter1};
{false, Limiter1} -> C#cr{limiter = Limiter1}
end.
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
index 4110202fc3..be42a03010 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
@@ -250,7 +250,7 @@ transferred(DeliveryTag, Channel,
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
- case lists:keyfind(Key, 1, KVList) of
+ case keyfind_unpack_described(Key, KVList) of
{_, {timestamp, Ts}} ->
[{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
{_, {utf8, Spec}} ->
@@ -262,3 +262,17 @@ source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
end;
source_filters_to_consumer_args(_Source) ->
[].
+
+keyfind_unpack_described(Key, KvList) ->
+ %% filterset values _should_ be described values
+ %% they aren't always however for historical reasons so we need this bit of
+ %% code to return a plain value for the given filter key
+ case lists:keyfind(Key, 1, KvList) of
+ {Key, {described, Key, Value}} ->
+ {Key, Value};
+ {Key, _} = Kv ->
+ Kv;
+ false ->
+ false
+ end.
+
diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
index 66037564e2..6870b099fb 100644
--- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
@@ -24,7 +24,9 @@ groups() ->
[
{tests, [], [
reliable_send_receive_with_outcomes,
+ roundtrip_classic_queue_with_drain,
roundtrip_quorum_queue_with_drain,
+ roundtrip_stream_queue_with_drain,
message_headers_conversion
]},
{metrics, [], [
@@ -149,16 +151,28 @@ reliable_send_receive(Config, Outcome) ->
ok.
+roundtrip_classic_queue_with_drain(Config) ->
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ roundtrip_queue_with_drain(Config, <<"classic">>, QName).
+
roundtrip_quorum_queue_with_drain(Config) ->
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ roundtrip_queue_with_drain(Config, <<"quorum">>, QName).
+
+roundtrip_stream_queue_with_drain(Config) ->
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ roundtrip_queue_with_drain(Config, <<"stream">>, QName).
+
+roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
- QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
- %% declare a quorum queue
+ %% declare a queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Args = [{<<"x-queue-type">>, longstr, QueueType}],
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
- arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
+ arguments = Args}),
% create a configuration map
OpnConf = #{address => Host,
port => Port,
@@ -182,16 +196,29 @@ roundtrip_quorum_queue_with_drain(Config) ->
flush("pre-receive"),
% create a receiver link
- {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
- <<"test-receiver">>,
- Address),
+
+ TerminusDurability = none,
+ Filter = case QueueType of
+ <<"stream">> ->
+ #{<<"rabbitmq:stream-offset-spec">> => <<"first">>};
+ _ ->
+ #{}
+ end,
+ Properties = #{},
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
+ Address, unsettled,
+ TerminusDurability,
+ Filter, Properties),
% grant credit and drain
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),
% wait for a delivery
receive
- {amqp10_msg, Receiver, _InMsg} -> ok
+ {amqp10_msg, Receiver, InMsg} ->
+ ok = amqp10_client:accept_msg(Receiver, InMsg),
+ wait_for_accepts(1),
+ ok
after 2000 ->
exit(delivery_timeout)
end,