summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-26 16:31:39 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-26 16:31:39 +0100
commit7745cbd876ae61344a3a8485c9456b39a10605cc (patch)
tree73653bb03816cba0f1bb465134cb341747169bd3
parent5f00855ba0ca440abe4efef80461b8aea1be3601 (diff)
downloadrabbitmq-server-7745cbd876ae61344a3a8485c9456b39a10605cc.tar.gz
Convert the x-purpose hack into a bit less of a hack: consumer priorities.
-rw-r--r--src/priority_queue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl80
2 files changed, 64 insertions, 46 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 18e1e8d9..0ffd208a 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -40,8 +40,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
- out/1, join/2]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
+ in/2, in/3, out/1, join/2, filter/2, fold/3]).
%%----------------------------------------------------------------------------
@@ -59,10 +59,13 @@
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
+-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
+-spec(fold/3 :: (fun ((any(), any()) -> any()), any(), pqueue()) -> any()).
-endif.
@@ -96,6 +99,9 @@ to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
+from_list(L) ->
+ lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
+
in(Item, Q) ->
in(Item, 0, Q).
@@ -147,6 +153,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
+out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), P).
+
+add_p(R, P) -> case R of
+ {empty, Q} -> {empty, Q};
+ {{value, V}, Q} -> {{value, V, P}, Q}
+ end.
+
join(A, {queue, [], [], 0}) ->
A;
join({queue, [], [], 0}, B) ->
@@ -185,6 +199,18 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
+filter(Pred, Q) -> fold(fun(V, P, Acc) ->
+ case Pred(V) of
+ true -> in(V, P, Acc);
+ false -> Acc
+ end
+ end, new(), Q).
+
+fold(Fun, Init, Q) -> case out_p(Q) of
+ {empty, _Q} -> Init;
+ {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
+ end.
+
r2f([], 0) -> {queue, [], [], 0};
r2f([_] = R, 1) -> {queue, [], R, 1};
r2f([X,Y], 2) -> {queue, [X], [Y], 2};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bafc12dd..e3eeded9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -145,7 +145,7 @@ init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- active_consumers = queue:new(),
+ active_consumers = priority_queue:new(),
senders = pmon:new(),
msg_id_to_channel = gb_trees:empty(),
status = running},
@@ -360,7 +360,7 @@ ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(State = #q{active_consumers = AC}) ->
- true = (queue:is_empty(AC) orelse is_empty(State)).
+ true = (priority_queue:is_empty(AC) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -379,7 +379,7 @@ ch_record(ChPid, LimiterPid) ->
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
- blocked_consumers = queue:new(),
+ blocked_consumers = priority_queue:new(),
limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
@@ -408,7 +408,8 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
- update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
+ Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked),
+ update_ch_record(C#cr{blocked_consumers = Blocked1}).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
@@ -432,7 +433,7 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case pick_consumer(ActiveConsumers) of
+ case priority_queue:out(ActiveConsumers) of
{empty, _} ->
{false, State};
{{value, QEntry}, Tail} ->
@@ -456,7 +457,8 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
notify_federation(State),
{false, State};
{continue, Limiter} ->
- AC1 = queue:in(E, State#q.active_consumers),
+ AC1 = priority_queue:in(E, consumer_priority(E),
+ State#q.active_consumers),
deliver_msg_to_consumer(
DeliverFun, Consumer, C#cr{limiter = Limiter},
State#q{active_consumers = AC1})
@@ -549,26 +551,14 @@ notify_federation(#q{q = Q,
end.
active_unfederated(Cs) ->
- case queue:out(Cs) of
- {empty, _} -> false;
- {{value, C}, Cs1} -> case federated_consumer(C) of
- true -> active_unfederated(Cs1);
- false -> true
- end
- end.
-
-%% TODO this could be more efficient. But we'd need another representation,
-%% and thus to abstract the representation of active_consumers.
-pick_consumer(Cs) ->
- case lists:splitwith(fun federated_consumer/1, queue:to_list(Cs)) of
- {_, []} -> queue:out(Cs);
- {Feds, [UnFed|Tl]} -> queue:out(queue:from_list([UnFed | Feds ++ Tl]))
- end.
-
-federated_consumer({_Pid, #consumer{args = Args}}) ->
- case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of
- {longstr, <<"federation">>} -> true;
- _ -> false
+ %% TODO could this be faster?
+ lists:any(fun ({Priority, _Consumer}) -> Priority < 0 end,
+ priority_queue:to_list(Cs)).
+
+consumer_priority({_ChPid, #consumer{args = Args}}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, Priority} -> Priority;
+ _ -> 0
end.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
@@ -668,17 +658,17 @@ requeue(AckTags, ChPid, State) ->
fun (State1) -> requeue_and_run(AckTags, State1) end).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
- end, Queue).
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
+ end, Queue).
remove_consumers(ChPid, Queue, QName) ->
- queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag, QName),
- false;
- (_) ->
- true
- end, Queue).
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag, QName),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -693,17 +683,17 @@ possibly_unblock(State, ChPid, Update) ->
unblock(State, C = #cr{limiter = Limiter}) ->
case lists:partition(
- fun({_ChPid, #consumer{tag = CTag}}) ->
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, queue:to_list(C#cr.blocked_consumers)) of
+ end, priority_queue:to_list(C#cr.blocked_consumers)) of
{_, []} ->
update_ch_record(C),
State;
{Blocked, Unblocked} ->
- BlockedQ = queue:from_list(Blocked),
- UnblockedQ = queue:from_list(Unblocked),
+ BlockedQ = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
run_message_queue(State#q{active_consumers = AC1})
end.
@@ -1041,9 +1031,9 @@ consumers(#q{active_consumers = ActiveConsumers}) ->
consumers(ActiveConsumers, []), all_ch_record()).
consumers(Consumers, Acc) ->
- rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) ->
- [{ChPid, CTag, AckRequired} | Acc1]
+ priority_queue:fold(
+ fun ({ChPid, #consumer{tag = CTag, ack_required = AckReq}}, _P, Acc1) ->
+ [{ChPid, CTag, AckReq} | Acc1]
end, Acc, Consumers).
emit_stats(State) ->
@@ -1208,7 +1198,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1)),
- AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ AC1 = priority_queue:in({ChPid, Consumer},
+ consumer_priority({ChPid, Consumer}),
+ State1#q.active_consumers),
reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;