summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-09-21 16:25:13 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-25 10:05:24 +0100
commit4a916b0057e8daf319fea4e020171aaadd4333d1 (patch)
tree547ee0a9781ed42e3c475918aa03ced0ef7cc445
parent8468954a87b9287a64d2936afed3a36b521462c4 (diff)
downloadrabbitmq-server-git-4a916b0057e8daf319fea4e020171aaadd4333d1.tar.gz
Quorum queue consumer prioritiesqq-consumer-priorities
This switches the service queue inside rabbit_fifo from a normal queue to a priority queue such that consumers with a higher priority are favoured for service.
-rw-r--r--src/rabbit_fifo.erl65
-rw-r--r--src/rabbit_fifo.hrl5
-rw-r--r--test/quorum_queue_SUITE.erl81
-rw-r--r--test/rabbit_fifo_SUITE.erl26
4 files changed, 151 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79b2bb4f72..d56f21a477 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -305,7 +305,7 @@ apply(#{index := Index,
{State0, {dequeue, empty}};
Ready ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch},
+ {once, 1, simple_prefetch}, 0,
State0),
{success, _, MsgId, Msg, State2} = checkout_one(Meta, State1),
{State4, Effects1} = case Settlement of
@@ -345,7 +345,8 @@ apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
- State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ Priority = get_priority_from_args(ConsumerMeta),
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0),
checkout(Meta, State0, State1, [{monitor, process, Pid}]);
apply(#{index := Index}, #purge{},
#?MODULE{ra_indexes = Indexes0,
@@ -524,6 +525,14 @@ convert_v0_to_v1(V0State0) ->
pending = element(3, E),
status = element(4, E)}
end, V0Enqs),
+ V0Cons = rabbit_fifo_v0:get_field(consumers, V0State),
+ V1Cons = maps:map(
+ fun (_CId, C0) ->
+ %% add the priority field
+ list_to_tuple(tuple_to_list(C0) ++ [0])
+ end, V0Cons),
+ V0SQ = rabbit_fifo_v0:get_field(service_queue, V0State),
+ V1SQ = priority_queue:from_list(queue:to_list(V0SQ)),
Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
@@ -547,8 +556,8 @@ convert_v0_to_v1(V0State0) ->
enqueuers = V1Enqs,
ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State),
release_cursors = rabbit_fifo_v0:get_field(release_cursors, V0State),
- consumers = rabbit_fifo_v0:get_field(consumers, V0State),
- service_queue = rabbit_fifo_v0:get_field(service_queue, V0State),
+ consumers = V1Cons,
+ service_queue = V1SQ,
prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State),
msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State),
msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State),
@@ -1670,13 +1679,12 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
end}.
checkout_one(Meta, #?MODULE{service_queue = SQ0,
- messages = Messages0,
- consumers = Cons0} = InitState) ->
- case queue:peek(SQ0) of
- {value, ConsumerId} ->
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
+ case priority_queue:out(SQ0) of
+ {{value, ConsumerId}, SQ1} ->
case take_next_msg(InitState) of
{ConsumerMsg, State0} ->
- SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
case maps:find(ConsumerId, Cons0) of
@@ -1727,7 +1735,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
empty ->
{nochange, InitState}
end;
- empty ->
+ {empty, _} ->
case lqueue:len(Messages0) of
0 -> {nochange, InitState};
_ -> {inactive, InitState}
@@ -1742,7 +1750,7 @@ update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
#?MODULE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
- service_queue = uniq_queue_in(ConsumerId, ServiceQueue)};
+ service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)};
update_or_remove_sub(#{system_time := Ts},
ConsumerId, #consumer{lifetime = once,
checked_out = Checked,
@@ -1761,43 +1769,45 @@ update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
#?MODULE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
- service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}.
+ service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}.
-uniq_queue_in(Key, Queue) ->
+uniq_queue_in(Key, #consumer{priority = P}, Queue) ->
% TODO: queue:member could surely be quite expensive, however the practical
% number of unique consumers may not be large enough for it to matter
- case queue:member(Key, Queue) of
+ case priority_queue:member(Key, Queue) of
true ->
Queue;
false ->
- queue:in(Key, Queue)
+ priority_queue:in(Key, P, Queue)
end.
-update_consumer(ConsumerId, Meta, Spec,
+update_consumer(ConsumerId, Meta, Spec, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
%% general case, single active consumer off
- update_consumer0(ConsumerId, Meta, Spec, State0);
-update_consumer(ConsumerId, Meta, Spec,
+ update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
+update_consumer(ConsumerId, Meta, Spec, Priority,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
- update_consumer0(ConsumerId, Meta, Spec, State0);
-update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
+ update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
+ priority = Priority,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
State0#?MODULE{waiting_consumers = WaitingConsumers1}.
-update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
#?MODULE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
+ priority = Priority,
credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
fun(S) ->
@@ -1811,12 +1821,12 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
ServiceQueue0),
State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
-maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
+maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con,
ServiceQueue0) ->
case Credit > 0 of
true ->
% consumerect needs service - check if already on service queue
- uniq_queue_in(ConsumerId, ServiceQueue0);
+ uniq_queue_in(ConsumerId, Con, ServiceQueue0);
false ->
ServiceQueue0
end.
@@ -2101,3 +2111,12 @@ is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
Ts > (LastActive + Expires) andalso maps:size(Active) == 0;
is_expired(_Ts, _State) ->
false.
+
+get_priority_from_args(#{args := Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_Key, Value} ->
+ Value;
+ _ -> 0
+ end;
+get_priority_from_args(_) ->
+ 0.
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 4bd88fa705..a44a2bc04d 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -94,7 +94,8 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
- status = up :: up | suspected_down | cancelled
+ status = up :: up | suspected_down | cancelled,
+ priority = 0 :: non_neg_integer()
}).
-type consumer() :: #consumer{}.
@@ -169,7 +170,7 @@
consumers = #{} :: #{consumer_id() => #consumer{}},
% consumers that require further service are queued here
% needs to be part of snapshot
- service_queue = queue:new() :: queue:queue(consumer_id()),
+ service_queue = priority_queue:new() :: priority_queue:queue(consumer_id()),
%% This is a special field that is only used for snapshots
%% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 942b42f5e4..ecb4fdac63 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -132,7 +132,8 @@ all_tests() ->
delete_if_empty,
delete_if_unused,
queue_ttl,
- peek
+ peek,
+ consumer_priorities
].
memory_tests() ->
@@ -2540,6 +2541,84 @@ queue_ttl(Config) ->
{<<"x-expires">>, long, 1000}]})),
ok.
+consumer_priorities(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ qos(Ch, 2, false),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% consumer with default priority
+ Tag1 = <<"ctag1">>,
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ,
+ no_ack = false,
+ consumer_tag = Tag1},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Tag1} ->
+ ok
+ end,
+ %% consumer with higher priority
+ Tag2 = <<"ctag2">>,
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ,
+ arguments = [{"x-priority", long, 10}],
+ no_ack = false,
+ consumer_tag = Tag2},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Tag2} ->
+ ok
+ end,
+
+ publish(Ch, QQ),
+ %% Tag2 should receive the message
+ DT1 = receive
+ {#'basic.deliver'{delivery_tag = D1,
+ consumer_tag = Tag2}, _} ->
+ D1
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+ publish(Ch, QQ),
+ %% Tag2 should receive the message
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ consumer_tag = Tag2}, _} ->
+ ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ publish(Ch, QQ),
+ %% Tag1 should receive the message as Tag2 has maxed qos
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ consumer_tag = Tag1}, _} ->
+ ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT1,
+ multiple = false}),
+ publish(Ch, QQ),
+ %% Tag2 should receive the message
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ consumer_tag = Tag2}, _} ->
+ ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ ok.
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 8431dd8db7..59a33eab4e 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -1509,10 +1509,13 @@ machine_version_test(_) ->
{S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries),
Self = self(),
{#rabbit_fifo{enqueuers = #{Self := #enqueuer{}},
+ consumers = #{Cid := #consumer{priority = 0}},
+ service_queue = S,
messages = Msgs}, ok, []} = apply(meta(Idx),
{machine_version, 0, 1}, S1),
%% validate message conversion to lqueue
?assertEqual(1, lqueue:len(Msgs)),
+ ?assert(priority_queue:is_queue(S)),
ok.
queue_ttl_test(_) ->
@@ -1631,6 +1634,29 @@ query_peek_test(_) ->
?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(3, State2)),
ok.
+checkout_priority_test(_) ->
+ Cid = {<<"checkout_priority_test">>, self()},
+ Pid = spawn(fun () -> ok end),
+ Cid2 = {<<"checkout_priority_test2">>, Pid},
+ Args = [{<<"x-priority">>, long, 1}],
+ {S1, _, _} =
+ apply(meta(3),
+ rabbit_fifo:make_checkout(Cid, {once, 2, simple_prefetch},
+ #{args => Args}),
+ test_init(test)),
+ {S2, _, _} =
+ apply(meta(3),
+ rabbit_fifo:make_checkout(Cid2, {once, 2, simple_prefetch},
+ #{args => []}),
+ S1),
+ {S3, E3} = enq(1, 1, first, S2),
+ ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E3),
+ {S4, E4} = enq(2, 2, second, S3),
+ ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E4),
+ {_S5, E5} = enq(3, 3, third, S4),
+ ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == Pid, E5),
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).