summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-22 12:06:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-22 12:06:50 +0100
commit9d7ed1b6259b34d1f87e9cc016eb8f20076db07a (patch)
treeab0bb91771df4e321d4f8b6eb12ff24363315d88
parent5a38b5e837c68ea8c86acd3edc2bc5b932271b5f (diff)
parent52aacccf08ce3a317845d41fffb04b6329324580 (diff)
downloadrabbitmq-server-9d7ed1b6259b34d1f87e9cc016eb8f20076db07a.tar.gz
Merge in bug25725.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl20
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_policy.erl14
-rw-r--r--src/rabbit_queue_decorator.erl48
-rw-r--r--src/rabbit_registry.erl1
-rw-r--r--src/rabbit_upgrade_functions.erl17
7 files changed, 139 insertions, 18 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 6df44bea..a764855e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -45,7 +45,7 @@
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids}).
+ gm_pids, decorators}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0673ff8e..38d72479 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -79,7 +79,8 @@
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
- fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue()))
+ -> 'not_found' | rabbit_types:amqqueue()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -155,6 +156,7 @@
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
+-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
@@ -278,9 +280,10 @@ update(Name, Fun) ->
case Durable of
true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
_ -> ok
- end;
+ end,
+ Q1;
[] ->
- ok
+ not_found
end.
store_queue(Q = #amqqueue{durable = true}) ->
@@ -294,8 +297,12 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) ->
+policy_changed(Q1 = #amqqueue{decorators = Decorators1},
+ Q2 = #amqqueue{decorators = Decorators2}) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ D1 = rabbit_queue_decorator:select(Decorators1),
+ D2 = rabbit_queue_decorator:select(Decorators2),
+ [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)],
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
wake_up(Q1).
@@ -568,6 +575,9 @@ basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid,
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+notify_decorators(#amqqueue{pid = QPid}) ->
+ delegate:cast(QPid, notify_decorators).
+
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
put(Key, case get(Key) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 972e6be0..8f2dbc84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,9 +137,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
senders = Senders,
msg_id_to_channel = MTC},
State2 = process_args(State1),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State2, Deliveries).
+ State3 = lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries),
+ notify_decorators(startup, [], State3),
+ State3.
init_state(Q) ->
State = #q{q = Q,
@@ -198,6 +200,7 @@ declare(Recover, From, State = #q{q = Q,
recovery_barrier(Recover),
State1 = process_args(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
+ notify_decorators(startup, [], State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -222,6 +225,27 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
+notify_decorators(Event, Props, State) when Event =:= startup;
+ Event =:= shutdown ->
+ decorator_callback(qname(State), Event, Props);
+
+notify_decorators(Event, Props, State = #q{active_consumers = ACs,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ decorator_callback(
+ qname(State), notify,
+ [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)},
+ {is_empty, BQ:is_empty(BQS)} | Props]]).
+
+decorator_callback(QName, F, A) ->
+ %% Look up again in case policy and hence decorators have changed
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q = #amqqueue{decorators = Ds}} ->
+ [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
+ {error, not_found} ->
+ ok
+ end.
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover =/= new,
@@ -275,6 +299,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
+ notify_decorators(shutdown, [], State),
[emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
@@ -405,15 +430,18 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
-block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
- update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
+block_consumer(C = #cr{blocked_consumers = Blocked},
+ {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
+ update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}),
+ notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> [send_drained(C) || C <- all_ch_record()];
+ true -> notify_decorators(queue_empty, [], State),
+ [send_drained(C) || C <- all_ch_record()];
false -> ok
end,
State.
@@ -443,13 +471,13 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
- true -> block_consumer(C, E),
+ true -> block_consumer(C, E, State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E),
+ block_consumer(C#cr{limiter = Limiter}, E, State),
{false, State};
{continue, Limiter} ->
AC1 = priority_queue:in(E, Priority,
@@ -674,6 +702,9 @@ unblock(State, C = #cr{limiter = Limiter}) ->
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
State1 = State#q{active_consumers = AC1},
+ [notify_decorators(
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
run_message_queue(State1)
end.
@@ -1181,6 +1212,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
not NoAck, qname(State1), OtherArgs),
AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
State2 = State1#q{active_consumers = AC1},
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State2),
reply(ok, run_message_queue(State2))
end;
@@ -1211,6 +1244,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
+ notify_decorators(
+ basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1388,6 +1423,10 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
end
end);
+handle_cast(notify_decorators, State) ->
+ notify_decorators(refresh, [], State),
+ noreply(State);
+
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0785d278..738fa92a 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,7 +46,8 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
+ Q#amqqueue{policy = set0(Name)});
set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
X#exchange{policy = set0(Name)}).
@@ -228,9 +229,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
case match(QName, Policies) of
OldPolicy -> no_change;
- NewPolicy -> rabbit_amqqueue:update(
- QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
- {Q, Q#amqqueue{policy = NewPolicy}}
+ NewPolicy -> case rabbit_amqqueue:update(
+ QName, fun(Q1) ->
+ rabbit_queue_decorator:set(
+ Q1#amqqueue{policy = NewPolicy})
+ end) of
+ #amqqueue{} = Q1 -> {Q, Q1};
+ not_found -> {Q, Q }
+ end
end.
notify(no_change)->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
new file mode 100644
index 00000000..8f6375a5
--- /dev/null
+++ b/src/rabbit_queue_decorator.erl
@@ -0,0 +1,48 @@
+-module(rabbit_queue_decorator).
+
+-include("rabbit.hrl").
+
+-export([select/1, set/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(notify_event() :: 'consumer_blocked' |
+ 'consumer_unblocked' |
+ 'queue_empty' |
+ 'basic_consume' |
+ 'basic_cancel' |
+ 'refresh').
+
+-callback startup(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+ 'ok'.
+
+-callback active_for(rabbit_types:amqqueue()) -> boolean().
+
+-callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
+ {active_for, 1}, {notify, 3}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+select(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index f933e4e9..3014aeb7 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
class_module(policy_validator) -> rabbit_policy_validator;
class_module(ha_mode) -> rabbit_mirror_queue_mode.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index d50cb282..6f95ef60 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -45,6 +45,7 @@
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
%% -------------------------------------------------------------------
@@ -72,6 +73,7 @@
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
+-spec(queue_decorators/0 :: () -> 'ok').
-endif.
@@ -323,6 +325,21 @@ apply_to(Def) ->
[_, _] -> <<"all">>
end.
+queue_decorators() ->
+ ok = queue_decorators(rabbit_queue),
+ ok = queue_decorators(rabbit_durable_queue).
+
+queue_decorators(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, []}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->