summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-21 11:09:34 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-21 11:09:34 +0100
commit4c602c8642924d966738eebb1e05b906d43ffec9 (patch)
tree3d439eb8535d1e0d865043c88e96587c011e5a38
parente8237d531a62b7ad5b787fcf8c301082ed507917 (diff)
downloadrabbitmq-server-4c602c8642924d966738eebb1e05b906d43ffec9.tar.gz
Small changes: Don't reevaluate consumer priority on every delivery, refactor a touch, realign, fix test, more specific spec.
-rw-r--r--src/priority_queue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_tests.erl2
3 files changed, 18 insertions, 18 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index ef45ef66..c76c0d33 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -41,7 +41,7 @@
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
- in/2, in/3, out/1, join/2, filter/2, fold/3, highest/1]).
+ in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]).
%%----------------------------------------------------------------------------
@@ -63,10 +63,11 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
-spec(fold/3 ::
- (fun ((any(), priority(), any()) -> any()), any(), pqueue()) -> any()).
+ (fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4d8e1c19..5ddd14a9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -430,7 +430,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked},
{_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
- Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked),
+ Blocked1 = add_consumer(QEntry, Blocked),
update_ch_record(C#cr{blocked_consumers = Blocked1}),
notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
@@ -456,17 +456,17 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case priority_queue:out(ActiveConsumers) of
+ case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
{false, State};
- {{value, QEntry}, Tail} ->
+ {{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
- DeliverFun, QEntry,
+ DeliverFun, {QEntry, Priority},
State#q{active_consumers = Tail}),
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
+deliver_msg_to_consumer(DeliverFun, {E = {ChPid, Consumer}, Priority}, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E, State),
@@ -478,7 +478,7 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
block_consumer(C#cr{limiter = Limiter}, E, State),
{false, State};
{continue, Limiter} ->
- AC1 = priority_queue:in(E, consumer_priority(E),
+ AC1 = priority_queue:in(E, Priority,
State#q.active_consumers),
deliver_msg_to_consumer(
DeliverFun, Consumer, C#cr{limiter = Limiter},
@@ -561,11 +561,12 @@ run_message_queue(State) ->
notify_decorators(queue_run_finished, [], State1),
State1.
-consumer_priority({_ChPid, #consumer{args = Args}}) ->
- case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
- {_, Priority} -> Priority;
- _ -> 0
- end.
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers).
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -1195,7 +1196,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
true -> send_drained(C1);
false -> ok
end,
- Consumer = #consumer{tag = ConsumerTag,
+ Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
args = OtherArgs},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1206,9 +1207,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1)),
- AC1 = priority_queue:in({ChPid, Consumer},
- consumer_priority({ChPid, Consumer}),
- State1#q.active_consumers),
+ AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
State2 = State1#q{active_consumers = AC1},
notify_decorators(
basic_consume, [{consumer_tag, ConsumerTag}], State2),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5af4969a..76421d1a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1168,7 +1168,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),