summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2023-02-01 21:41:19 -0500
committerGitHub <noreply@github.com>2023-02-01 21:41:19 -0500
commit70094e6c1f2cf18e9b51f51d2468c1aaaf4d32e1 (patch)
treef5629177de5940b661a49c28ff775bd5ad6752bd
parent535a0be564491c516e09264cb98f7e6d92507a4a (diff)
parent3bb32737ee9229612dcb776929cff795aff69151 (diff)
downloadrabbitmq-server-git-70094e6c1f2cf18e9b51f51d2468c1aaaf4d32e1.tar.gz
Merge pull request #7141 from rabbitmq/send-drained-bug-fix
Fix channel crash when draining AMQP 1.0 credits from classic queue
-rw-r--r--deps/amqp10_client/src/amqp10_client.erl11
-rw-r--r--deps/amqp10_client/src/amqp10_client_session.erl80
-rw-r--r--deps/rabbit/src/rabbit_channel.erl30
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl9
-rw-r--r--deps/rabbit/src/rabbit_queue_consumers.erl10
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl16
-rw-r--r--deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl41
7 files changed, 137 insertions, 60 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl
index 78d065dd7b..d15c8df01b 100644
--- a/deps/amqp10_client/src/amqp10_client.erl
+++ b/deps/amqp10_client/src/amqp10_client.erl
@@ -269,7 +269,16 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
snd_settle_mode(), terminus_durability(), filter(),
properties()) ->
{ok, link_ref()}.
-attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
+attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
+ when is_pid(Session) andalso
+ is_binary(Name) andalso
+ is_binary(Source) andalso
+ (SettleMode == unsettled orelse
+ SettleMode == settled orelse
+ SettleMode == mixed) andalso
+ is_atom(Durability) andalso
+ is_map(Filter) andalso
+ is_map(Properties) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source,
durable => Durability}, self()},
diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl
index 0472b881ec..d6eec4a76a 100644
--- a/deps/amqp10_client/src/amqp10_client_session.erl
+++ b/deps/amqp10_client/src/amqp10_client_session.erl
@@ -648,41 +648,51 @@ translate_terminus_durability(none) -> 0;
translate_terminus_durability(configuration) -> 1;
translate_terminus_durability(unsettled_state) -> 2.
-translate_filters(Filters) when is_map(Filters) andalso map_size(Filters) =< 0 -> undefined;
-translate_filters(Filters) when is_map(Filters) -> {
- map,
- maps:fold(
- fun(<<"apache.org:legacy-amqp-direct-binding:string">> = K, V, Acc) when is_binary(V) ->
- [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
- (<<"apache.org:legacy-amqp-topic-binding:string">> = K, V, Acc) when is_binary(V) ->
- [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc];
- (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
- [{{symbol, K}, {described, {symbol, K}, translate_legacy_amqp_headers_binding(V)}} | Acc];
- (<<"apache.org:no-local-filter:list">> = K, V, Acc) when is_list(V) ->
- [{{symbol, K}, {described, {symbol, K}, lists:map(fun(Id) -> {utf8, Id} end, V)}} | Acc];
- (<<"apache.org:selector-filter:string">> = K, V, Acc) when is_binary(V) ->
- [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]
- end,
- [],
- Filters)
-}.
+translate_filters(Filters)
+ when is_map(Filters) andalso
+ map_size(Filters) == 0 ->
+ undefined;
+translate_filters(Filters)
+ when is_map(Filters) ->
+ {map,
+ maps:fold(
+ fun
+ (<<"apache.org:legacy-amqp-headers-binding:map">> = K, V, Acc) when is_map(V) ->
+ %% special case conversion
+ Key = sym(K),
+ [{Key, {described, Key, translate_legacy_amqp_headers_binding(V)}} | Acc];
+ (K, V, Acc) when is_binary(K) ->
+ %% try treat any filter value generically
+ Key = sym(K),
+ Value = filter_value_type(V),
+ [{Key, {described, Key, Value}} | Acc]
+ end, [], Filters)}.
+
+filter_value_type(V) when is_binary(V) ->
+ %% this is clearly not always correct
+ {utf8, V};
+filter_value_type(V)
+ when is_integer(V) andalso V >= 0 ->
+ {uint, V};
+filter_value_type(VList) when is_list(VList) ->
+ [filter_value_type(V) || V <- VList];
+filter_value_type({T, _} = V) when is_atom(T) ->
+ %% looks like an already tagged type, just pass it through
+ V.
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
-translate_legacy_amqp_headers_binding(LegacyHeaders) -> {
- map,
- maps:fold(
- fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
- [{{utf8, K}, {utf8, V}} | Acc];
- (<<"x-match">> = K, <<"all">> = V, Acc) ->
- [{{utf8, K}, {utf8, V}} | Acc];
- (<<"x-",_/binary>>, _, Acc) ->
- Acc;
- (K, V, Acc) ->
- [{{utf8, K}, {utf8, V}} | Acc]
- end,
- [],
- LegacyHeaders)
-}.
+translate_legacy_amqp_headers_binding(LegacyHeaders) ->
+ {map,
+ maps:fold(
+ fun(<<"x-match">> = K, <<"any">> = V, Acc) ->
+ [{{utf8, K}, {utf8, V}} | Acc];
+ (<<"x-match">> = K, <<"all">> = V, Acc) ->
+ [{{utf8, K}, {utf8, V}} | Acc];
+ (<<"x-", _/binary>>, _, Acc) ->
+ Acc;
+ (K, V, Acc) ->
+ [{{utf8, K}, filter_value_type(V)} | Acc]
+ end, [], LegacyHeaders)}.
send_detach(Send, {detach, OutHandle}, _From, State = #state{links = Links}) ->
case Links of
@@ -1011,8 +1021,10 @@ wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(V) -> amqp10_client_types:utf8(V).
+
+sym(B) when is_binary(B) -> {symbol, B};
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
-sym(B) when is_binary(B) -> {symbol, B}.
+sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 2d5ccdcad4..1d81d98cd9 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2831,16 +2831,7 @@ evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
handle_queue_actions(Actions, #ch{} = State0) ->
WriterPid = State0#ch.cfg#conf.writer_pid,
lists:foldl(
- fun ({send_credit_reply, Avail}, S0) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.credit_ok'{available = Avail}),
- S0;
- ({send_drained, {CTag, Credit}}, S0) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.credit_drained'{consumer_tag = CTag,
- credit_drained = Credit}),
- S0;
+ fun
({settled, QRef, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QRef, S0);
({rejected, _QRef, MsgSeqNos}, S0) ->
@@ -2865,9 +2856,28 @@ handle_queue_actions(Actions, #ch{} = State0) ->
S0;
({unblock, QName}, S0) ->
credit_flow:unblock(QName),
+ S0;
+ ({send_credit_reply, Avail}, S0) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.credit_ok'{available = Avail}),
+ S0;
+ ({send_drained, {CTag, Credit}}, S0) ->
+ ok = send_drained_to_writer(WriterPid, CTag, Credit),
+ S0;
+ ({send_drained, CTagCredits}, S0) when is_list(CTagCredits) ->
+ %% this is the backwards compatible option that classic queues
+ %% used to send, this can be removed in 4.0
+ [ok = send_drained_to_writer(WriterPid, CTag, Credit)
+ || {CTag, Credit} <- CTagCredits],
S0
end, State0, Actions).
+send_drained_to_writer(WriterPid, CTag, Credit) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.credit_drained'{consumer_tag = CTag,
+ credit_drained = Credit}).
+
maybe_increase_global_publishers(#ch{publishing_mode = true} = State0) ->
State0;
maybe_increase_global_publishers(State0) ->
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index a5b92bda94..abd5fd9d32 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -551,9 +551,14 @@ deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) ->
Evt = {queue_event, QName, Deliver},
gen_server:cast(Pid, Evt).
-send_drained(Pid, QName, CTagCredits) ->
+send_drained(Pid, QName, CTagCredits) when is_list(CTagCredits) ->
+ [_ = gen_server:cast(Pid, {queue_event, QName,
+ {send_drained, CTagCredit}})
+ || CTagCredit <- CTagCredits],
+ ok;
+send_drained(Pid, QName, CTagCredit) when is_tuple(CTagCredit) ->
gen_server:cast(Pid, {queue_event, QName,
- {send_drained, CTagCredits}}).
+ {send_drained, CTagCredit}}).
send_credit_reply(Pid, QName, Len) when is_integer(Len) ->
gen_server:cast(Pid, {queue_event, QName,
diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl
index 4bcb897f98..c89f7b85bc 100644
--- a/deps/rabbit/src/rabbit_queue_consumers.erl
+++ b/deps/rabbit/src/rabbit_queue_consumers.erl
@@ -193,9 +193,9 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
end.
-spec send_drained(rabbit_amqqueue:name()) -> 'ok'.
-
-send_drained(QName) -> [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
- ok.
+send_drained(QName) ->
+ [update_ch_record(send_drained(QName, C)) || C <- all_ch_record()],
+ ok.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
rabbit_amqqueue:name(), state(), boolean(),
@@ -528,7 +528,7 @@ send_drained(QName, C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
{[], Limiter} -> C;
{CTagCredits, Limiter2} ->
- rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
+ ok = rabbit_classic_queue:send_drained(ChPid, QName, CTagCredits),
C#cr{limiter = Limiter2}
end.
@@ -536,7 +536,7 @@ credit_and_drain(QName, C = #cr{ch_pid = ChPid, limiter = Limiter},
CTag, Credit, Mode, IsEmpty) ->
case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} ->
- rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
+ ok = rabbit_classic_queue:send_drained(ChPid, QName, [{CTag, Credit}]),
C#cr{limiter = Limiter1};
{false, Limiter1} -> C#cr{limiter = Limiter1}
end.
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
index 4110202fc3..be42a03010 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
@@ -250,7 +250,7 @@ transferred(DeliveryTag, Channel,
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
- case lists:keyfind(Key, 1, KVList) of
+ case keyfind_unpack_described(Key, KVList) of
{_, {timestamp, Ts}} ->
[{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
{_, {utf8, Spec}} ->
@@ -262,3 +262,17 @@ source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
end;
source_filters_to_consumer_args(_Source) ->
[].
+
+keyfind_unpack_described(Key, KvList) ->
+ %% filterset values _should_ be described values
+ %% they aren't always however for historical reasons so we need this bit of
+ %% code to return a plain value for the given filter key
+ case lists:keyfind(Key, 1, KvList) of
+ {Key, {described, Key, Value}} ->
+ {Key, Value};
+ {Key, _} = Kv ->
+ Kv;
+ false ->
+ false
+ end.
+
diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
index 66037564e2..6870b099fb 100644
--- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
@@ -24,7 +24,9 @@ groups() ->
[
{tests, [], [
reliable_send_receive_with_outcomes,
+ roundtrip_classic_queue_with_drain,
roundtrip_quorum_queue_with_drain,
+ roundtrip_stream_queue_with_drain,
message_headers_conversion
]},
{metrics, [], [
@@ -149,16 +151,28 @@ reliable_send_receive(Config, Outcome) ->
ok.
+roundtrip_classic_queue_with_drain(Config) ->
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ roundtrip_queue_with_drain(Config, <<"classic">>, QName).
+
roundtrip_quorum_queue_with_drain(Config) ->
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ roundtrip_queue_with_drain(Config, <<"quorum">>, QName).
+
+roundtrip_stream_queue_with_drain(Config) ->
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ roundtrip_queue_with_drain(Config, <<"stream">>, QName).
+
+roundtrip_queue_with_drain(Config, QueueType, QName) when is_binary(QueueType) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
- QName = atom_to_binary(?FUNCTION_NAME, utf8),
Address = <<"/amq/queue/", QName/binary>>,
- %% declare a quorum queue
+ %% declare a queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Args = [{<<"x-queue-type">>, longstr, QueueType}],
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
- arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
+ arguments = Args}),
% create a configuration map
OpnConf = #{address => Host,
port => Port,
@@ -182,16 +196,29 @@ roundtrip_quorum_queue_with_drain(Config) ->
flush("pre-receive"),
% create a receiver link
- {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
- <<"test-receiver">>,
- Address),
+
+ TerminusDurability = none,
+ Filter = case QueueType of
+ <<"stream">> ->
+ #{<<"rabbitmq:stream-offset-spec">> => <<"first">>};
+ _ ->
+ #{}
+ end,
+ Properties = #{},
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
+ Address, unsettled,
+ TerminusDurability,
+ Filter, Properties),
% grant credit and drain
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),
% wait for a delivery
receive
- {amqp10_msg, Receiver, _InMsg} -> ok
+ {amqp10_msg, Receiver, InMsg} ->
+ ok = amqp10_client:accept_msg(Receiver, InMsg),
+ wait_for_accepts(1),
+ ok
after 2000 ->
exit(delivery_timeout)
end,