diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 12:58:25 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 12:58:25 +0000 |
commit | 3ab3d8c77f0fa04ebbd350b188b482791f82dc09 (patch) | |
tree | be1da9be3d373fd78cd9a5fa35ef29f3e5a0dbdf | |
parent | 7661b35c2ecf54716cdf8aa7b76861afa2635092 (diff) | |
download | rabbitmq-server-3ab3d8c77f0fa04ebbd350b188b482791f82dc09.tar.gz |
Simplfy (although perhaps not as much as we had hoped) by only informing queue decorators when the active consumers may have changed, rather than trying to give them more information about what is happening.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 31 | ||||
-rw-r--r-- | src/rabbit_queue_decorator.erl | 11 |
3 files changed, 38 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 281aecb9..7597ec9d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3 = lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), - notify_decorators(startup, [], State3), + notify_decorators(startup, State3), State3. init_state(Q) -> @@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q, State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), - notify_decorators(startup, [], State), + notify_decorators(startup, State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -213,18 +213,16 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, Props, State) when Event =:= startup; - Event =:= shutdown -> - decorator_callback(qname(State), Event, Props); +notify_decorators(Event, State) -> + decorator_callback(qname(State), Event, []). -notify_decorators(Event, Props, State = #q{consumers = Consumers, - backing_queue = BQ, - backing_queue_state = BQS}) -> +notify_decorators(State = #q{consumers = Consumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> P = rabbit_queue_consumers:max_active_priority(Consumers), - decorator_callback(qname(State), notify, - [Event, [{max_active_consumer_priority, P}, - {is_empty, BQ:is_empty(BQS)} | - Props]]). + decorator_callback(qname(State), active_consumers_changed, + [[{max_active_consumer_priority, P}, + {is_empty, BQ:is_empty(BQS)}]]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -308,7 +306,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - notify_decorators(shutdown, [], State), + notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _, _} <- rabbit_queue_consumers:all(Consumers)], @@ -401,7 +399,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> notify_decorators(queue_empty, [], State), + true -> notify_decorators(State), rabbit_queue_consumers:send_drained(); false -> ok end, @@ -412,8 +410,10 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State, Consumers), State2 = State1#q{consumers = Consumers1}, - [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) || - {_ChPid, CTag} <- Blocked], + case Blocked of + true -> notify_decorators(State2); + false -> ok + end, {Active, State2}. confirm_messages([], State) -> @@ -568,10 +568,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of unchanged -> State; - {unblocked, UnblockedCTags, Consumers1} -> + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, - [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}], - State1) || CTag <- UnblockedCTags], + notify_decorators(State1), run_message_queue(State1) end. @@ -599,8 +598,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end, State2 = State1#q{consumers = Consumers1, exclusive_consumer = Holder1}, - [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || - CTag <- ChCTags], + notify_decorators(State2), case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(ChAckTags, @@ -1034,8 +1032,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State1), OtherArgs), - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1054,8 +1051,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), - notify_decorators( - basic_cancel, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1218,7 +1214,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, ChPid, State)); handle_cast(notify_decorators, State) -> - notify_decorators(refresh, [], State), + notify_decorators(State), noreply(State); handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 702091dc..19b68cac 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -82,8 +82,7 @@ -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> - 'unchanged' | - {'unblocked', [rabbit_types:ctag()], state()}. + 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). -spec notify_sent_fun(non_neg_integer()) -> cr_fun(). -spec activate_limit_fun() -> cr_fun(). @@ -182,42 +181,41 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, [], S, State). + deliver(FetchFun, Stop, QName, false, S, State). -deliver(_FetchFun, true, _QName, Blocked, S, State) -> - {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, +deliver(_FetchFun, true, _QName, NewlyBlocked, S, State) -> + {true, NewlyBlocked, S, State}; +deliver( FetchFun, false, QName, NewlyBlocked, S, State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; + Use1 = update_use(Use, inactive), + {false, NewlyBlocked, S, State#state{use = Use1}}; {{value, QEntry, Priority}, Tail} -> - {Stop, Blocked1, S1, Consumers1} = + {Stop, NewlyBlocked1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, - Blocked, S, Tail), - deliver(FetchFun, Stop, QName, Blocked1, S1, + NewlyBlocked, S, Tail), + deliver(FetchFun, Stop, QName, NewlyBlocked1, S1, State#state{consumers = Consumers1}) end. deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - Blocked, S, Consumers) -> + NewlyBlocked, S, Consumers) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], - {false, Blocked1, S, Consumers}; + {false, true, S, Consumers}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], - {false, Blocked1, S, Consumers}; + {false, true, S, Consumers}; {continue, Limiter} -> {Stop, S1} = deliver_to_consumer( FetchFun, Consumer, C#cr{limiter = Limiter}, QName, S), - {Stop, Blocked, S1, + {Stop, NewlyBlocked, S1, priority_queue:in(E, Priority, Consumers)} end end. @@ -290,7 +288,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, - tags(Unblocked), State#state{consumers = priority_queue:join(Consumers, UnblockedQ), use = update_use(Use, active)}} end. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 8f6375a5..b3c02403 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,13 +8,6 @@ -ifdef(use_specs). --type(notify_event() :: 'consumer_blocked' | - 'consumer_unblocked' | - 'queue_empty' | - 'basic_consume' | - 'basic_cancel' | - 'refresh'). - -callback startup(rabbit_types:amqqueue()) -> 'ok'. -callback shutdown(rabbit_types:amqqueue()) -> 'ok'. @@ -24,7 +17,7 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. +-callback active_consumers_changed(rabbit_types:amqqueue(), any()) -> 'ok'. -else. @@ -32,7 +25,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {notify, 3}]; + {active_for, 1}, {active_consumers_changed, 2}]; behaviour_info(_Other) -> undefined. |