diff options
author | Philip Kuryloski <kuryloskip@vmware.com> | 2020-11-16 16:04:36 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-16 16:04:36 +0100 |
commit | d0d901f095e4d20e3560fb13feb1398ec236a84a (patch) | |
tree | 12d17b4dc2e3d4b385cc1161134276c9fb380cc5 | |
parent | 4ca56d6f4290c1f039b1a08176920d8015bb5e56 (diff) | |
parent | 8ff5273827a45d7ad637ae6df3c9caf0e3c96cfb (diff) | |
download | rabbitmq-server-git-d0d901f095e4d20e3560fb13feb1398ec236a84a.tar.gz |
Merge pull request #2504 from rabbitmq/qq-credit-mode
Use correct credit mode x-credit
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 32 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 43 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 17 |
3 files changed, 45 insertions, 47 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 6673cadc93..3990222b15 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -15,7 +15,6 @@ init/2, init/3, init/5, - checkout/4, checkout/5, cancel_checkout/2, enqueue/2, @@ -349,27 +348,6 @@ discard(ConsumerTag, [_|_] = MsgIds, end, {[], [], MsgIds}, Unsent0), {State0#state{unsent_commands = Unsent}, []}. - -%% @doc Register with the rabbit_fifo queue to "checkout" messages as they -%% become available. -%% -%% This is a synchronous call. I.e. the call will block until the command -%% has been accepted by the ra process or it times out. -%% -%% @param ConsumerTag a unique tag to identify this particular consumer. -%% @param NumUnsettled the maximum number of in-flight messages. Once this -%% number of messages has been received but not settled no further messages -%% will be delivered to the consumer. -%% @param State The {@module} state. -%% -%% @returns `{ok, State}' or `{error | timeout, term()}' --spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), - rabbit_fifo:consumer_meta(), - state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) - when is_map(ConsumerInfo) -> - checkout(ConsumerTag, NumUnsettled, get_credit_mode(ConsumerInfo), ConsumerInfo, State0). - %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. %% @@ -908,13 +886,3 @@ find_leader([Server | Servers]) -> qref({Ref, _}) -> Ref; qref(Ref) -> Ref. - -get_credit_mode(#{args := Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-credit">>) of - {_Key, Value} -> - Value; - _ -> - simple_prefetch - end; -get_credit_mode(_) -> - simple_prefetch. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index a51fc3f43e..95cc93d728 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -667,20 +667,33 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> ConsumerTag = quorum_ctag(ConsumerTag0), %% A prefetch count of 0 means no limitation, %% let's make it into something large for ra - Prefetch = case ConsumerPrefetchCount of - 0 -> 2000; - Other -> Other - end, + Prefetch0 = case ConsumerPrefetchCount of + 0 -> 2000; + Other -> Other + end, %% consumer info is used to describe the consumer properties AckRequired = not NoAck, ConsumerMeta = #{ack => AckRequired, prefetch => ConsumerPrefetchCount, args => Args, username => ActingUser}, - {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag, - Prefetch, - ConsumerMeta, - QState0), + + {CreditMode, Credit, Drain} = parse_credit_args(Prefetch0, Args), + %% if the mode is credited we should send a separate credit command + %% after checkout and give 0 credits initally + Prefetch = case CreditMode of + credited -> 0; + simple_prefetch -> Prefetch0 + end, + {ok, QState1} = rabbit_fifo_client:checkout(ConsumerTag, Prefetch, + CreditMode, ConsumerMeta, + QState0), + QState = case CreditMode of + credited when Credit > 0 -> + rabbit_fifo_client:credit(ConsumerTag, Credit, Drain, + QState1); + _ -> QState1 + end, case ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1) of {ok, {_, SacResult}, _} -> @@ -1494,3 +1507,17 @@ overflow(<<"reject-publish-dlx">> = V, Def, QName) -> rabbit_log:warning("Invalid overflow strategy ~p for quorum queue: ~p", [V, rabbit_misc:rs(QName)]), Def. + +parse_credit_args(Default, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> + case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, C}, {bool, D}} -> + {credited, C, D}; + _ -> + {simple_prefetch, Default, false} + end; + undefined -> + {simple_prefetch, Default, false} + end. diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 927014b882..37f5436dbf 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -86,7 +86,8 @@ basics(Config) -> CustomerTag = UId, ok = start_cluster(ClusterName, [ServerId]), FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, #{}, FState0), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, simple_prefetch, + #{}, FState0), ra_log_wal:force_roll_over(ra_log_wal), % create segment the segment will trigger a snapshot @@ -179,7 +180,7 @@ duplicate_delivery(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), Fun = fun Loop(S0) -> receive @@ -214,7 +215,7 @@ usage(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), {_, _, _} = process_ra_events(receive_ra_events(2, 2), F3), @@ -267,7 +268,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), {_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), % lose first delivery @@ -297,6 +298,7 @@ returns_after_down(Config) -> _Pid = spawn(fun () -> F = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, + simple_prefetch, #{}, F), Self ! checkout_done end), @@ -376,7 +378,8 @@ discard(Config) -> _ = ra:members(ServerId), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, + simple_prefetch, #{}, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), F3 = discard_next_delivery(F2, 5000), {empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), @@ -397,7 +400,7 @@ cancel_checkout(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, #{}, F1), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, simple_prefetch, #{}, F1), {_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), {F5, _} = rabbit_fifo_client:return(<<"tag">>, [0], F4), @@ -490,7 +493,7 @@ test_queries(Config) -> exit(ready_timeout) end, F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, #{}, F0), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, simple_prefetch, #{}, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, fun rabbit_fifo:query_messages_ready/1), ?assertEqual(1, Ready), |