summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilip Kuryloski <kuryloskip@vmware.com>2020-11-16 16:04:36 +0100
committerGitHub <noreply@github.com>2020-11-16 16:04:36 +0100
commitd0d901f095e4d20e3560fb13feb1398ec236a84a (patch)
tree12d17b4dc2e3d4b385cc1161134276c9fb380cc5
parent4ca56d6f4290c1f039b1a08176920d8015bb5e56 (diff)
parent8ff5273827a45d7ad637ae6df3c9caf0e3c96cfb (diff)
downloadrabbitmq-server-git-d0d901f095e4d20e3560fb13feb1398ec236a84a.tar.gz
Merge pull request #2504 from rabbitmq/qq-credit-mode
Use correct credit mode x-credit
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl32
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl43
-rw-r--r--deps/rabbit/test/rabbit_fifo_int_SUITE.erl17
3 files changed, 45 insertions, 47 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 6673cadc93..3990222b15 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -15,7 +15,6 @@
init/2,
init/3,
init/5,
- checkout/4,
checkout/5,
cancel_checkout/2,
enqueue/2,
@@ -349,27 +348,6 @@ discard(ConsumerTag, [_|_] = MsgIds,
end, {[], [], MsgIds}, Unsent0),
{State0#state{unsent_commands = Unsent}, []}.
-
-%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
-%% become available.
-%%
-%% This is a synchronous call. I.e. the call will block until the command
-%% has been accepted by the ra process or it times out.
-%%
-%% @param ConsumerTag a unique tag to identify this particular consumer.
-%% @param NumUnsettled the maximum number of in-flight messages. Once this
-%% number of messages has been received but not settled no further messages
-%% will be delivered to the consumer.
-%% @param State The {@module} state.
-%%
-%% @returns `{ok, State}' or `{error | timeout, term()}'
--spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
- rabbit_fifo:consumer_meta(),
- state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0)
- when is_map(ConsumerInfo) ->
- checkout(ConsumerTag, NumUnsettled, get_credit_mode(ConsumerInfo), ConsumerInfo, State0).
-
%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
%% become available.
%%
@@ -908,13 +886,3 @@ find_leader([Server | Servers]) ->
qref({Ref, _}) -> Ref;
qref(Ref) -> Ref.
-
-get_credit_mode(#{args := Args}) ->
- case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
- {_Key, Value} ->
- Value;
- _ ->
- simple_prefetch
- end;
-get_credit_mode(_) ->
- simple_prefetch.
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index a51fc3f43e..95cc93d728 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -667,20 +667,33 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
ConsumerTag = quorum_ctag(ConsumerTag0),
%% A prefetch count of 0 means no limitation,
%% let's make it into something large for ra
- Prefetch = case ConsumerPrefetchCount of
- 0 -> 2000;
- Other -> Other
- end,
+ Prefetch0 = case ConsumerPrefetchCount of
+ 0 -> 2000;
+ Other -> Other
+ end,
%% consumer info is used to describe the consumer properties
AckRequired = not NoAck,
ConsumerMeta = #{ack => AckRequired,
prefetch => ConsumerPrefetchCount,
args => Args,
username => ActingUser},
- {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag,
- Prefetch,
- ConsumerMeta,
- QState0),
+
+ {CreditMode, Credit, Drain} = parse_credit_args(Prefetch0, Args),
+ %% if the mode is credited we should send a separate credit command
+ %% after checkout and give 0 credits initally
+ Prefetch = case CreditMode of
+ credited -> 0;
+ simple_prefetch -> Prefetch0
+ end,
+ {ok, QState1} = rabbit_fifo_client:checkout(ConsumerTag, Prefetch,
+ CreditMode, ConsumerMeta,
+ QState0),
+ QState = case CreditMode of
+ credited when Credit > 0 ->
+ rabbit_fifo_client:credit(ConsumerTag, Credit, Drain,
+ QState1);
+ _ -> QState1
+ end,
case ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1) of
{ok, {_, SacResult}, _} ->
@@ -1494,3 +1507,17 @@ overflow(<<"reject-publish-dlx">> = V, Def, QName) ->
rabbit_log:warning("Invalid overflow strategy ~p for quorum queue: ~p",
[V, rabbit_misc:rs(QName)]),
Def.
+
+parse_credit_args(Default, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} ->
+ case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, C}, {bool, D}} ->
+ {credited, C, D};
+ _ ->
+ {simple_prefetch, Default, false}
+ end;
+ undefined ->
+ {simple_prefetch, Default, false}
+ end.
diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl
index 927014b882..37f5436dbf 100644
--- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl
@@ -86,7 +86,8 @@ basics(Config) ->
CustomerTag = UId,
ok = start_cluster(ClusterName, [ServerId]),
FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, simple_prefetch,
+ #{}, FState0),
ra_log_wal:force_roll_over(ra_log_wal),
% create segment the segment will trigger a snapshot
@@ -179,7 +180,7 @@ duplicate_delivery(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
Fun = fun Loop(S0) ->
receive
@@ -214,7 +215,7 @@ usage(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
{_, _, _} = process_ra_events(receive_ra_events(2, 2), F3),
@@ -267,7 +268,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
{_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
% lose first delivery
@@ -297,6 +298,7 @@ returns_after_down(Config) ->
_Pid = spawn(fun () ->
F = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
+ simple_prefetch,
#{}, F),
Self ! checkout_done
end),
@@ -376,7 +378,8 @@ discard(Config) ->
_ = ra:members(ServerId),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10,
+ simple_prefetch, #{}, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
F3 = discard_next_delivery(F2, 5000),
{empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
@@ -397,7 +400,7 @@ cancel_checkout(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F1),
{_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
{F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
@@ -490,7 +493,7 @@ test_queries(Config) ->
exit(ready_timeout)
end,
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, simple_prefetch, #{}, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
fun rabbit_fifo:query_messages_ready/1),
?assertEqual(1, Ready),