summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-21 15:41:43 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-21 15:41:43 +0100
commit52aacccf08ce3a317845d41fffb04b6329324580 (patch)
tree4b1badb49b35fb8568098fbab55e43ee54cabe7d
parentb78f55bda18f00f5c19eb4c7eea0b247e9767c18 (diff)
parentba3313ec9d8c0460786b79992c4c37a2619fb5fb (diff)
downloadrabbitmq-server-52aacccf08ce3a317845d41fffb04b6329324580.tar.gz
Merge default
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/priority_queue.erl37
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl147
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_policy.erl14
-rw-r--r--src/rabbit_queue_decorator.erl48
-rw-r--r--src/rabbit_registry.erl1
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl17
10 files changed, 241 insertions, 73 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 6df44bea..a764855e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -45,7 +45,7 @@
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids}).
+ gm_pids, decorators}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 6995c3be..c76c0d33 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -40,8 +40,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
- out/1, join/2]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
+ in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]).
%%----------------------------------------------------------------------------
@@ -59,10 +59,16 @@
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
+-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
+-spec(fold/3 ::
+ (fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
+-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
@@ -96,6 +102,9 @@ to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
+from_list(L) ->
+ lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
+
in(Item, Q) ->
in(Item, 0, Q).
@@ -147,6 +156,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
+out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
+
+add_p(R, P) -> case R of
+ {empty, Q} -> {empty, Q};
+ {{value, V}, Q} -> {{value, V, P}, Q}
+ end.
+
join(A, {queue, [], [], 0}) ->
A;
join({queue, [], [], 0}, B) ->
@@ -185,6 +202,22 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
+filter(Pred, Q) -> fold(fun(V, P, Acc) ->
+ case Pred(V) of
+ true -> in(V, P, Acc);
+ false -> Acc
+ end
+ end, new(), Q).
+
+fold(Fun, Init, Q) -> case out_p(Q) of
+ {empty, _Q} -> Init;
+ {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
+ end.
+
+highest({queue, [], [], 0}) -> empty;
+highest({queue, _, _, _}) -> 0;
+highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
+
r2f([], 0) -> {queue, [], [], 0};
r2f([_] = R, 1) -> {queue, [], R, 1};
r2f([X,Y], 2) -> {queue, [X], [Y], 2};
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a1efaf65..68252a5f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -79,7 +79,8 @@
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
- fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue()))
+ -> 'not_found' | rabbit_types:amqqueue()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -149,12 +150,13 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
+-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
@@ -278,9 +280,10 @@ update(Name, Fun) ->
case Durable of
true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
_ -> ok
- end;
+ end,
+ Q1;
[] ->
- ok
+ not_found
end.
store_queue(Q = #amqqueue{durable = true}) ->
@@ -294,8 +297,12 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) ->
+policy_changed(Q1 = #amqqueue{decorators = Decorators1},
+ Q2 = #amqqueue{decorators = Decorators2}) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ D1 = rabbit_queue_decorator:select(Decorators1),
+ D2 = rabbit_queue_decorator:select(Decorators2),
+ [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)],
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
wake_up(Q1).
@@ -550,13 +557,17 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
+ OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+notify_decorators(#amqqueue{pid = QPid}) ->
+ delegate:cast(QPid, notify_decorators).
+
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
put(Key, case get(Key) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6e0eb9bf..026e6818 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,7 +55,7 @@
status
}).
--record(consumer, {tag, ack_required}).
+-record(consumer, {tag, ack_required, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -137,15 +137,17 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
senders = Senders,
msg_id_to_channel = MTC},
State2 = process_args(State1),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State2, Deliveries).
+ State3 = lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries),
+ notify_decorators(startup, [], State3),
+ State3.
init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- active_consumers = queue:new(),
+ active_consumers = priority_queue:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running},
@@ -198,6 +200,7 @@ declare(Recover, From, State = #q{q = Q,
recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
+ notify_decorators(startup, [], State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -222,6 +225,27 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
+notify_decorators(Event, Props, State) when Event =:= startup;
+ Event =:= shutdown ->
+ decorator_callback(qname(State), Event, Props);
+
+notify_decorators(Event, Props, State = #q{active_consumers = ACs,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ decorator_callback(
+ qname(State), notify,
+ [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)},
+ {is_empty, BQ:is_empty(BQS)} | Props]]).
+
+decorator_callback(QName, F, A) ->
+ %% Look up again in case policy and hence decorators have changed
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q = #amqqueue{decorators = Ds}} ->
+ [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
+ {error, not_found} ->
+ ok
+ end.
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover =/= new,
@@ -275,6 +299,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
+ notify_decorators(shutdown, [], State),
[emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
@@ -358,7 +383,7 @@ ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(State = #q{active_consumers = AC}) ->
- true = (queue:is_empty(AC) orelse is_empty(State)).
+ true = (priority_queue:is_empty(AC) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -377,7 +402,7 @@ ch_record(ChPid, LimiterPid) ->
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
- blocked_consumers = queue:new(),
+ blocked_consumers = priority_queue:new(),
limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
@@ -405,15 +430,19 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
-block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
- update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
+block_consumer(C = #cr{blocked_consumers = Blocked},
+ {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
+ Blocked1 = add_consumer(QEntry, Blocked),
+ update_ch_record(C#cr{blocked_consumers = Blocked1}),
+ notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> [send_drained(C) || C <- all_ch_record()];
+ true -> notify_decorators(queue_empty, [], State),
+ [send_drained(C) || C <- all_ch_record()];
false -> ok
end,
State.
@@ -430,42 +459,43 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case queue:out(ActiveConsumers) of
+ case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
{false, State};
- {{value, QEntry}, Tail} ->
+ {{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
- DeliverFun, QEntry,
+ DeliverFun, QEntry, Priority,
State#q{active_consumers = Tail}),
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
- true -> block_consumer(C, E),
+ true -> block_consumer(C, E, State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E),
+ block_consumer(C#cr{limiter = Limiter}, E, State),
{false, State};
{continue, Limiter} ->
- AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
+ AC1 = priority_queue:in(E, Priority,
+ State#q.active_consumers),
+ deliver_msg_to_consumer0(
DeliverFun, Consumer, C#cr{limiter = Limiter},
State#q{active_consumers = AC1})
end
end.
-deliver_msg_to_consumer(DeliverFun,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName}}) ->
+deliver_msg_to_consumer0(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ State = #q{q = #amqqueue{name = QName}}) ->
{{Message, IsDelivered, AckTag}, Stop, State1} =
DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
@@ -533,6 +563,13 @@ run_message_queue(State) ->
is_empty(State), State),
State1.
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers).
+
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -629,17 +666,17 @@ requeue(AckTags, ChPid, State) ->
fun (State1) -> requeue_and_run(AckTags, State1) end).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
- end, Queue).
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
+ end, Queue).
remove_consumers(ChPid, Queue, QName) ->
- queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag, QName),
- false;
- (_) ->
- true
- end, Queue).
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag, QName),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -654,18 +691,22 @@ possibly_unblock(State, ChPid, Update) ->
unblock(State, C = #cr{limiter = Limiter}) ->
case lists:partition(
- fun({_ChPid, #consumer{tag = CTag}}) ->
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, queue:to_list(C#cr.blocked_consumers)) of
+ end, priority_queue:to_list(C#cr.blocked_consumers)) of
{_, []} ->
update_ch_record(C),
State;
{Blocked, Unblocked} ->
- BlockedQ = queue:from_list(Blocked),
- UnblockedQ = queue:from_list(Unblocked),
+ BlockedQ = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = queue:join(State#q.active_consumers, UnblockedQ),
- run_message_queue(State#q{active_consumers = AC1})
+ AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
+ State1 = State#q{active_consumers = AC1},
+ [notify_decorators(
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1002,9 +1043,9 @@ consumers(#q{active_consumers = ActiveConsumers}) ->
consumers(ActiveConsumers, []), all_ch_record()).
consumers(Consumers, Acc) ->
- rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) ->
- [{ChPid, CTag, AckRequired} | Acc1]
+ priority_queue:fold(
+ fun ({ChPid, #consumer{tag = CTag, ack_required = AckReq}}, _P, Acc1) ->
+ [{ChPid, CTag, AckReq} | Acc1]
end, Acc, Consumers).
emit_stats(State) ->
@@ -1134,7 +1175,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
@@ -1157,8 +1198,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
true -> send_drained(C1);
false -> ok
end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck,
+ args = OtherArgs},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
@@ -1167,8 +1209,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1)),
- AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
- reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
+ AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
+ State2 = State1#q{active_consumers = AC1},
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State2),
+ reply(ok, run_message_queue(State2))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -1198,6 +1243,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
+ notify_decorators(
+ basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1374,6 +1421,10 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
end
end);
+handle_cast(notify_decorators, State) ->
+ notify_decorators(refresh, [], State),
+ noreply(State);
+
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2cfbd96d..c3780581 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
- arguments = Arguments},
+ arguments = Args},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
+ {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- parse_credit_args(Arguments),
+ CreditArgs, OtherArgs,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1247,12 +1248,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
- _ -> none
- end;
- undefined -> none
+ {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
+ undefined -> {none, Arguments}
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0785d278..738fa92a 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,7 +46,8 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
+ Q#amqqueue{policy = set0(Name)});
set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
X#exchange{policy = set0(Name)}).
@@ -228,9 +229,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
case match(QName, Policies) of
OldPolicy -> no_change;
- NewPolicy -> rabbit_amqqueue:update(
- QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
- {Q, Q#amqqueue{policy = NewPolicy}}
+ NewPolicy -> case rabbit_amqqueue:update(
+ QName, fun(Q1) ->
+ rabbit_queue_decorator:set(
+ Q1#amqqueue{policy = NewPolicy})
+ end) of
+ #amqqueue{} = Q1 -> {Q, Q1};
+ not_found -> {Q, Q }
+ end
end.
notify(no_change)->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
new file mode 100644
index 00000000..8f6375a5
--- /dev/null
+++ b/src/rabbit_queue_decorator.erl
@@ -0,0 +1,48 @@
+-module(rabbit_queue_decorator).
+
+-include("rabbit.hrl").
+
+-export([select/1, set/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(notify_event() :: 'consumer_blocked' |
+ 'consumer_unblocked' |
+ 'queue_empty' |
+ 'basic_consume' |
+ 'basic_cancel' |
+ 'refresh').
+
+-callback startup(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+ 'ok'.
+
+-callback active_for(rabbit_types:amqqueue()) -> boolean().
+
+-callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
+ {active_for, 1}, {notify, 3}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+select(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index f933e4e9..3014aeb7 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
class_module(policy_validator) -> rabbit_policy_validator;
class_module(ha_mode) -> rabbit_mirror_queue_mode.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5af4969a..76421d1a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1168,7 +1168,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index d50cb282..6f95ef60 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -45,6 +45,7 @@
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
%% -------------------------------------------------------------------
@@ -72,6 +73,7 @@
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
+-spec(queue_decorators/0 :: () -> 'ok').
-endif.
@@ -323,6 +325,21 @@ apply_to(Def) ->
[_, _] -> <<"all">>
end.
+queue_decorators() ->
+ ok = queue_decorators(rabbit_queue),
+ ok = queue_decorators(rabbit_durable_queue).
+
+queue_decorators(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, []}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->