diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-21 11:09:34 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-21 11:09:34 +0100 |
commit | 4c602c8642924d966738eebb1e05b906d43ffec9 (patch) | |
tree | 3d439eb8535d1e0d865043c88e96587c011e5a38 | |
parent | e8237d531a62b7ad5b787fcf8c301082ed507917 (diff) | |
download | rabbitmq-server-4c602c8642924d966738eebb1e05b906d43ffec9.tar.gz |
Small changes: Don't reevaluate consumer priority on every delivery, refactor a touch, realign, fix test, more specific spec.
-rw-r--r-- | src/priority_queue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
3 files changed, 18 insertions, 18 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index ef45ef66..c76c0d33 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -41,7 +41,7 @@ -module(priority_queue). -export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1, - in/2, in/3, out/1, join/2, filter/2, fold/3, highest/1]). + in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]). %%---------------------------------------------------------------------------- @@ -63,10 +63,11 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()). -spec(fold/3 :: - (fun ((any(), priority(), any()) -> any()), any(), pqueue()) -> any()). + (fun ((any(), priority(), A) -> A), A, pqueue()) -> A). -spec(highest/1 :: (pqueue()) -> priority() | 'empty'). -endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4d8e1c19..5ddd14a9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -430,7 +430,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> - Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked), + Blocked1 = add_consumer(QEntry, Blocked), update_ch_record(C#cr{blocked_consumers = Blocked1}), notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). @@ -456,17 +456,17 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case priority_queue:out(ActiveConsumers) of + case priority_queue:out_p(ActiveConsumers) of {empty, _} -> {false, State}; - {{value, QEntry}, Tail} -> + {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( - DeliverFun, QEntry, + DeliverFun, {QEntry, Priority}, State#q{active_consumers = Tail}), deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> +deliver_msg_to_consumer(DeliverFun, {E = {ChPid, Consumer}, Priority}, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E, State), @@ -478,7 +478,7 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> block_consumer(C#cr{limiter = Limiter}, E, State), {false, State}; {continue, Limiter} -> - AC1 = priority_queue:in(E, consumer_priority(E), + AC1 = priority_queue:in(E, Priority, State#q.active_consumers), deliver_msg_to_consumer( DeliverFun, Consumer, C#cr{limiter = Limiter}, @@ -561,11 +561,12 @@ run_message_queue(State) -> notify_decorators(queue_run_finished, [], State1), State1. -consumer_priority({_ChPid, #consumer{args = Args}}) -> - case rabbit_misc:table_lookup(Args, <<"x-priority">>) of - {_, Priority} -> Priority; - _ -> 0 - end. +add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> + Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_, P} -> P; + _ -> 0 + end, + priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers). attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -1195,7 +1196,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, true -> send_drained(C1); false -> ok end, - Consumer = #consumer{tag = ConsumerTag, + Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, args = OtherArgs}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1206,9 +1207,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State1)), - AC1 = priority_queue:in({ChPid, Consumer}, - consumer_priority({ChPid, Consumer}), - State1#q.active_consumers), + AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), State2 = State1#q{active_consumers = AC1}, notify_decorators( basic_consume, [{consumer_tag, ConsumerTag}], State2), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5af4969a..76421d1a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1168,7 +1168,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), |