diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-22 12:06:50 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-22 12:06:50 +0100 |
commit | 9d7ed1b6259b34d1f87e9cc016eb8f20076db07a (patch) | |
tree | ab0bb91771df4e321d4f8b6eb12ff24363315d88 | |
parent | 5a38b5e837c68ea8c86acd3edc2bc5b932271b5f (diff) | |
parent | 52aacccf08ce3a317845d41fffb04b6329324580 (diff) | |
download | rabbitmq-server-9d7ed1b6259b34d1f87e9cc016eb8f20076db07a.tar.gz |
Merge in bug25725.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 14 | ||||
-rw-r--r-- | src/rabbit_queue_decorator.erl | 48 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 1 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 17 |
7 files changed, 139 insertions, 18 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6df44bea..a764855e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -45,7 +45,7 @@ -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid, slave_pids, sync_slave_pids, policy, - gm_pids}). + gm_pids, decorators}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0673ff8e..38d72479 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/10, basic_cancel/4]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -79,7 +79,8 @@ -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), - fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). + fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) + -> 'not_found' | rabbit_types:amqqueue()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); @@ -155,6 +156,7 @@ -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). +-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). @@ -278,9 +280,10 @@ update(Name, Fun) -> case Durable of true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); _ -> ok - end; + end, + Q1; [] -> - ok + not_found end. store_queue(Q = #amqqueue{durable = true}) -> @@ -294,8 +297,12 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(Q1, Q2) -> +policy_changed(Q1 = #amqqueue{decorators = Decorators1}, + Q2 = #amqqueue{decorators = Decorators2}) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + D1 = rabbit_queue_decorator:select(Decorators1), + D2 = rabbit_queue_decorator:select(Decorators2), + [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)], %% Make sure we emit a stats event even if nothing %% mirroring-related has changed - the policy may have changed anyway. wake_up(Q1). @@ -568,6 +575,9 @@ basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). +notify_decorators(#amqqueue{pid = QPid}) -> + delegate:cast(QPid, notify_decorators). + notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, put(Key, case get(Key) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 972e6be0..8f2dbc84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,9 +137,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, senders = Senders, msg_id_to_channel = MTC}, State2 = process_args(State1), - lists:foldl(fun (Delivery, StateN) -> - deliver_or_enqueue(Delivery, true, StateN) - end, State2, Deliveries). + State3 = lists:foldl(fun (Delivery, StateN) -> + deliver_or_enqueue(Delivery, true, StateN) + end, State2, Deliveries), + notify_decorators(startup, [], State3), + State3. init_state(Q) -> State = #q{q = Q, @@ -198,6 +200,7 @@ declare(Recover, From, State = #q{q = Q, recovery_barrier(Recover), State1 = process_args(State#q{backing_queue = BQ, backing_queue_state = BQS}), + notify_decorators(startup, [], State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -222,6 +225,27 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. +notify_decorators(Event, Props, State) when Event =:= startup; + Event =:= shutdown -> + decorator_callback(qname(State), Event, Props); + +notify_decorators(Event, Props, State = #q{active_consumers = ACs, + backing_queue = BQ, + backing_queue_state = BQS}) -> + decorator_callback( + qname(State), notify, + [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)}, + {is_empty, BQ:is_empty(BQS)} | Props]]). + +decorator_callback(QName, F, A) -> + %% Look up again in case policy and hence decorators have changed + case rabbit_amqqueue:lookup(QName) of + {ok, Q = #amqqueue{decorators = Ds}} -> + [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)]; + {error, not_found} -> + ok + end. + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover =/= new, @@ -275,6 +299,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), + notify_decorators(shutdown, [], State), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} @@ -405,15 +430,18 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> - update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}). +block_consumer(C = #cr{blocked_consumers = Blocked}, + {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> + update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}), + notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> [send_drained(C) || C <- all_ch_record()]; + true -> notify_decorators(queue_empty, [], State), + [send_drained(C) || C <- all_ch_record()]; false -> ok end, State. @@ -443,13 +471,13 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of - true -> block_consumer(C, E), + true -> block_consumer(C, E, State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E), + block_consumer(C#cr{limiter = Limiter}, E, State), {false, State}; {continue, Limiter} -> AC1 = priority_queue:in(E, Priority, @@ -674,6 +702,9 @@ unblock(State, C = #cr{limiter = Limiter}) -> update_ch_record(C#cr{blocked_consumers = BlockedQ}), AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), State1 = State#q{active_consumers = AC1}, + [notify_decorators( + consumer_unblocked, [{consumer_tag, CTag}], State1) || + {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], run_message_queue(State1) end. @@ -1181,6 +1212,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, not NoAck, qname(State1), OtherArgs), AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), State2 = State1#q{active_consumers = AC1}, + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State2), reply(ok, run_message_queue(State2)) end; @@ -1211,6 +1244,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, + notify_decorators( + basic_cancel, [{consumer_tag, ConsumerTag}], State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1388,6 +1423,10 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, end end); +handle_cast(notify_decorators, State) -> + notify_decorators(refresh, [], State), + noreply(State); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 0785d278..738fa92a 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,7 +46,8 @@ name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set( + Q#amqqueue{policy = set0(Name)}); set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( X#exchange{policy = set0(Name)}). @@ -228,9 +229,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> case match(QName, Policies) of OldPolicy -> no_change; - NewPolicy -> rabbit_amqqueue:update( - QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), - {Q, Q#amqqueue{policy = NewPolicy}} + NewPolicy -> case rabbit_amqqueue:update( + QName, fun(Q1) -> + rabbit_queue_decorator:set( + Q1#amqqueue{policy = NewPolicy}) + end) of + #amqqueue{} = Q1 -> {Q, Q1}; + not_found -> {Q, Q } + end end. notify(no_change)-> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl new file mode 100644 index 00000000..8f6375a5 --- /dev/null +++ b/src/rabbit_queue_decorator.erl @@ -0,0 +1,48 @@ +-module(rabbit_queue_decorator). + +-include("rabbit.hrl"). + +-export([select/1, set/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(notify_event() :: 'consumer_blocked' | + 'consumer_unblocked' | + 'queue_empty' | + 'basic_consume' | + 'basic_cancel' | + 'refresh'). + +-callback startup(rabbit_types:amqqueue()) -> 'ok'. + +-callback shutdown(rabbit_types:amqqueue()) -> 'ok'. + +-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> + 'ok'. + +-callback active_for(rabbit_types:amqqueue()) -> boolean(). + +-callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, + {active_for, 1}, {notify, 3}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +select(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index f933e4e9..3014aeb7 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; class_module(policy_validator) -> rabbit_policy_validator; class_module(ha_mode) -> rabbit_mirror_queue_mode. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index d50cb282..6f95ef60 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -45,6 +45,7 @@ -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). +-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). %% ------------------------------------------------------------------- @@ -72,6 +73,7 @@ -spec(gm_pids/0 :: () -> 'ok'). -spec(exchange_decorators/0 :: () -> 'ok'). -spec(policy_apply_to/0 :: () -> 'ok'). +-spec(queue_decorators/0 :: () -> 'ok'). -endif. @@ -323,6 +325,21 @@ apply_to(Def) -> [_, _] -> <<"all">> end. +queue_decorators() -> + ok = queue_decorators(rabbit_queue), + ok = queue_decorators(rabbit_durable_queue). + +queue_decorators(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, []} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, policy, gm_pids, decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |