diff options
author | Michael Klishin <klishinm@vmware.com> | 2020-09-29 12:48:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-29 12:48:49 +0300 |
commit | 6f9558e937140da7e0f7fda25e3099a57b2e2345 (patch) | |
tree | 240c34261d91aaf06524decd57a59d5602b61f73 | |
parent | 119d2c3fcf77ff7fad428fb0e4150005e9ca581d (diff) | |
parent | 4a916b0057e8daf319fea4e020171aaadd4333d1 (diff) | |
download | rabbitmq-server-git-6f9558e937140da7e0f7fda25e3099a57b2e2345.tar.gz |
Merge pull request #2451 from rabbitmq/qq-consumer-priorities
Quorum queue consumer priorities
-rw-r--r-- | src/rabbit_fifo.erl | 65 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 5 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 81 | ||||
-rw-r--r-- | test/rabbit_fifo_SUITE.erl | 26 |
4 files changed, 151 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 85d173f39a..03d1625b9c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -305,7 +305,7 @@ apply(#{index := Index, {State0, {dequeue, empty}}; Ready -> State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, + {once, 1, simple_prefetch}, 0, State0), {success, _, MsgId, Msg, State2} = checkout_one(Meta, State1), {State4, Effects1} = case Settlement of @@ -345,7 +345,8 @@ apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> - State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0), + Priority = get_priority_from_args(ConsumerMeta), + State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0), checkout(Meta, State0, State1, [{monitor, process, Pid}]); apply(#{index := Index}, #purge{}, #?MODULE{ra_indexes = Indexes0, @@ -524,6 +525,14 @@ convert_v0_to_v1(V0State0) -> pending = element(3, E), status = element(4, E)} end, V0Enqs), + V0Cons = rabbit_fifo_v0:get_field(consumers, V0State), + V1Cons = maps:map( + fun (_CId, C0) -> + %% add the priority field + list_to_tuple(tuple_to_list(C0) ++ [0]) + end, V0Cons), + V0SQ = rabbit_fifo_v0:get_field(service_queue, V0State), + V1SQ = priority_queue:from_list(queue:to_list(V0SQ)), Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State), resource = rabbit_fifo_v0:get_cfg_field(resource, V0State), release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State), @@ -547,8 +556,8 @@ convert_v0_to_v1(V0State0) -> enqueuers = V1Enqs, ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State), release_cursors = rabbit_fifo_v0:get_field(release_cursors, V0State), - consumers = rabbit_fifo_v0:get_field(consumers, V0State), - service_queue = rabbit_fifo_v0:get_field(service_queue, V0State), + consumers = V1Cons, + service_queue = V1SQ, prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State), msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State), msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State), @@ -1673,13 +1682,12 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> end}. checkout_one(Meta, #?MODULE{service_queue = SQ0, - messages = Messages0, - consumers = Cons0} = InitState) -> - case queue:peek(SQ0) of - {value, ConsumerId} -> + messages = Messages0, + consumers = Cons0} = InitState) -> + case priority_queue:out(SQ0) of + {{value, ConsumerId}, SQ1} -> case take_next_msg(InitState) of {ConsumerMsg, State0} -> - SQ1 = queue:drop(SQ0), %% there are consumers waiting to be serviced %% process consumer checkout case maps:find(ConsumerId, Cons0) of @@ -1730,7 +1738,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, empty -> {nochange, InitState} end; - empty -> + {empty, _} -> case lqueue:len(Messages0) of 0 -> {nochange, InitState}; _ -> {inactive, InitState} @@ -1745,7 +1753,7 @@ update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con, #?MODULE{consumers = Cons, service_queue = ServiceQueue} = State) -> State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), - service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}; + service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}; update_or_remove_sub(#{system_time := Ts}, ConsumerId, #consumer{lifetime = once, checked_out = Checked, @@ -1764,43 +1772,45 @@ update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con, #?MODULE{consumers = Cons, service_queue = ServiceQueue} = State) -> State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons), - service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}. + service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}. -uniq_queue_in(Key, Queue) -> +uniq_queue_in(Key, #consumer{priority = P}, Queue) -> % TODO: queue:member could surely be quite expensive, however the practical % number of unique consumers may not be large enough for it to matter - case queue:member(Key, Queue) of + case priority_queue:member(Key, Queue) of true -> Queue; false -> - queue:in(Key, Queue) + priority_queue:in(Key, P, Queue) end. -update_consumer(ConsumerId, Meta, Spec, +update_consumer(ConsumerId, Meta, Spec, Priority, #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off - update_consumer0(ConsumerId, Meta, Spec, State0); -update_consumer(ConsumerId, Meta, Spec, + update_consumer0(ConsumerId, Meta, Spec, Priority, State0); +update_consumer(ConsumerId, Meta, Spec, Priority, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet - update_consumer0(ConsumerId, Meta, Spec, State0); -update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, + update_consumer0(ConsumerId, Meta, Spec, Priority, State0); +update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, + priority = Priority, credit = Credit, credit_mode = Mode}, WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], State0#?MODULE{waiting_consumers = WaitingConsumers1}. -update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, +update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority, #?MODULE{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, + priority = Priority, credit = Credit, credit_mode = Mode}, Cons = maps:update_with(ConsumerId, fun(S) -> @@ -1814,12 +1824,12 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, ServiceQueue0), State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. -maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, +maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con, ServiceQueue0) -> case Credit > 0 of true -> % consumerect needs service - check if already on service queue - uniq_queue_in(ConsumerId, ServiceQueue0); + uniq_queue_in(ConsumerId, Con, ServiceQueue0); false -> ServiceQueue0 end. @@ -2104,3 +2114,12 @@ is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires}, Ts > (LastActive + Expires) andalso maps:size(Active) == 0; is_expired(_Ts, _State) -> false. + +get_priority_from_args(#{args := Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_Key, Value} -> + Value; + _ -> 0 + end; +get_priority_from_args(_) -> + 0. diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 4bd88fa705..a44a2bc04d 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -94,7 +94,8 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - status = up :: up | suspected_down | cancelled + status = up :: up | suspected_down | cancelled, + priority = 0 :: non_neg_integer() }). -type consumer() :: #consumer{}. @@ -169,7 +170,7 @@ consumers = #{} :: #{consumer_id() => #consumer{}}, % consumers that require further service are queued here % needs to be part of snapshot - service_queue = queue:new() :: queue:queue(consumer_id()), + service_queue = priority_queue:new() :: priority_queue:queue(consumer_id()), %% This is a special field that is only used for snapshots %% It represents the queued messages at the time the %% dehydrated snapshot state was cached. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 942b42f5e4..ecb4fdac63 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -132,7 +132,8 @@ all_tests() -> delete_if_empty, delete_if_unused, queue_ttl, - peek + peek, + consumer_priorities ]. memory_tests() -> @@ -2540,6 +2541,84 @@ queue_ttl(Config) -> {<<"x-expires">>, long, 1000}]})), ok. +consumer_priorities(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + qos(Ch, 2, false), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% consumer with default priority + Tag1 = <<"ctag1">>, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ, + no_ack = false, + consumer_tag = Tag1}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Tag1} -> + ok + end, + %% consumer with higher priority + Tag2 = <<"ctag2">>, + amqp_channel:subscribe(Ch, #'basic.consume'{queue = QQ, + arguments = [{"x-priority", long, 10}], + no_ack = false, + consumer_tag = Tag2}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Tag2} -> + ok + end, + + publish(Ch, QQ), + %% Tag2 should receive the message + DT1 = receive + {#'basic.deliver'{delivery_tag = D1, + consumer_tag = Tag2}, _} -> + D1 + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + publish(Ch, QQ), + %% Tag2 should receive the message + receive + {#'basic.deliver'{delivery_tag = _, + consumer_tag = Tag2}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + publish(Ch, QQ), + %% Tag1 should receive the message as Tag2 has maxed qos + receive + {#'basic.deliver'{delivery_tag = _, + consumer_tag = Tag1}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT1, + multiple = false}), + publish(Ch, QQ), + %% Tag2 should receive the message + receive + {#'basic.deliver'{delivery_tag = _, + consumer_tag = Tag2}, _} -> + ok + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok. + %%---------------------------------------------------------------------------- declare(Ch, Q) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 8b3201845b..d19dcb3682 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -1509,10 +1509,13 @@ machine_version_test(_) -> {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries), Self = self(), {#rabbit_fifo{enqueuers = #{Self := #enqueuer{}}, + consumers = #{Cid := #consumer{priority = 0}}, + service_queue = S, messages = Msgs}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S1), %% validate message conversion to lqueue ?assertEqual(1, lqueue:len(Msgs)), + ?assert(priority_queue:is_queue(S)), ok. queue_ttl_test(_) -> @@ -1631,6 +1634,29 @@ query_peek_test(_) -> ?assertEqual({error, no_message_at_pos}, rabbit_fifo:query_peek(3, State2)), ok. +checkout_priority_test(_) -> + Cid = {<<"checkout_priority_test">>, self()}, + Pid = spawn(fun () -> ok end), + Cid2 = {<<"checkout_priority_test2">>, Pid}, + Args = [{<<"x-priority">>, long, 1}], + {S1, _, _} = + apply(meta(3), + rabbit_fifo:make_checkout(Cid, {once, 2, simple_prefetch}, + #{args => Args}), + test_init(test)), + {S2, _, _} = + apply(meta(3), + rabbit_fifo:make_checkout(Cid2, {once, 2, simple_prefetch}, + #{args => []}), + S1), + {S3, E3} = enq(1, 1, first, S2), + ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E3), + {S4, E4} = enq(2, 2, second, S3), + ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == self(), E4), + {_S5, E5} = enq(3, 3, third, S4), + ?ASSERT_EFF({send_msg, P, {delivery, _, _}, _}, P == Pid, E5), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). |