summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 12:58:25 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 12:58:25 +0000
commit3ab3d8c77f0fa04ebbd350b188b482791f82dc09 (patch)
treebe1da9be3d373fd78cd9a5fa35ef29f3e5a0dbdf
parent7661b35c2ecf54716cdf8aa7b76861afa2635092 (diff)
downloadrabbitmq-server-3ab3d8c77f0fa04ebbd350b188b482791f82dc09.tar.gz
Simplfy (although perhaps not as much as we had hoped) by only informing queue decorators when the active consumers may have changed, rather than trying to give them more information about what is happening.
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_queue_consumers.erl31
-rw-r--r--src/rabbit_queue_decorator.erl11
3 files changed, 38 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 281aecb9..7597ec9d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
- notify_decorators(startup, [], State3),
+ notify_decorators(startup, State3),
State3.
init_state(Q) ->
@@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q,
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- notify_decorators(startup, [], State),
+ notify_decorators(startup, State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -213,18 +213,16 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, Props, State) when Event =:= startup;
- Event =:= shutdown ->
- decorator_callback(qname(State), Event, Props);
+notify_decorators(Event, State) ->
+ decorator_callback(qname(State), Event, []).
-notify_decorators(Event, Props, State = #q{consumers = Consumers,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+notify_decorators(State = #q{consumers = Consumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
P = rabbit_queue_consumers:max_active_priority(Consumers),
- decorator_callback(qname(State), notify,
- [Event, [{max_active_consumer_priority, P},
- {is_empty, BQ:is_empty(BQS)} |
- Props]]).
+ decorator_callback(qname(State), active_consumers_changed,
+ [[{max_active_consumer_priority, P},
+ {is_empty, BQ:is_empty(BQS)}]]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -308,7 +306,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- notify_decorators(shutdown, [], State),
+ notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
{Ch, CTag, _, _} <-
rabbit_queue_consumers:all(Consumers)],
@@ -401,7 +399,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> notify_decorators(queue_empty, [], State),
+ true -> notify_decorators(State),
rabbit_queue_consumers:send_drained();
false -> ok
end,
@@ -412,8 +410,10 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
Consumers),
State2 = State1#q{consumers = Consumers1},
- [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
- {_ChPid, CTag} <- Blocked],
+ case Blocked of
+ true -> notify_decorators(State2);
+ false -> ok
+ end,
{Active, State2}.
confirm_messages([], State) ->
@@ -568,10 +568,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
unchanged ->
State;
- {unblocked, UnblockedCTags, Consumers1} ->
+ {unblocked, Consumers1} ->
State1 = State#q{consumers = Consumers1},
- [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
- State1) || CTag <- UnblockedCTags],
+ notify_decorators(State1),
run_message_queue(State1)
end.
@@ -599,8 +598,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- ChCTags],
+ notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
@@ -1034,8 +1032,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1), OtherArgs),
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1054,8 +1051,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
- notify_decorators(
- basic_cancel, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1218,7 +1214,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
ChPid, State));
handle_cast(notify_decorators, State) ->
- notify_decorators(refresh, [], State),
+ notify_decorators(State),
noreply(State);
handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 702091dc..19b68cac 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -82,8 +82,7 @@
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' |
- {'unblocked', [rabbit_types:ctag()], state()}.
+ 'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -182,42 +181,41 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
deliver(FetchFun, Stop, QName, S, State) ->
- deliver(FetchFun, Stop, QName, [], S, State).
+ deliver(FetchFun, Stop, QName, false, S, State).
-deliver(_FetchFun, true, _QName, Blocked, S, State) ->
- {true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S,
+deliver(_FetchFun, true, _QName, NewlyBlocked, S, State) ->
+ {true, NewlyBlocked, S, State};
+deliver( FetchFun, false, QName, NewlyBlocked, S,
State = #state{consumers = Consumers, use = Use}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
+ Use1 = update_use(Use, inactive),
+ {false, NewlyBlocked, S, State#state{use = Use1}};
{{value, QEntry, Priority}, Tail} ->
- {Stop, Blocked1, S1, Consumers1} =
+ {Stop, NewlyBlocked1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
- Blocked, S, Tail),
- deliver(FetchFun, Stop, QName, Blocked1, S1,
+ NewlyBlocked, S, Tail),
+ deliver(FetchFun, Stop, QName, NewlyBlocked1, S1,
State#state{consumers = Consumers1})
end.
deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
- Blocked, S, Consumers) ->
+ NewlyBlocked, S, Consumers) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ {false, true, S, Consumers};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
- Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked],
- {false, Blocked1, S, Consumers};
+ {false, true, S, Consumers};
{continue, Limiter} ->
{Stop, S1} = deliver_to_consumer(
FetchFun, Consumer,
C#cr{limiter = Limiter}, QName, S),
- {Stop, Blocked, S1,
+ {Stop, NewlyBlocked, S1,
priority_queue:in(E, Priority, Consumers)}
end
end.
@@ -290,7 +288,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
- tags(Unblocked),
State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
use = update_use(Use, active)}}
end.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 8f6375a5..b3c02403 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,13 +8,6 @@
-ifdef(use_specs).
--type(notify_event() :: 'consumer_blocked' |
- 'consumer_unblocked' |
- 'queue_empty' |
- 'basic_consume' |
- 'basic_cancel' |
- 'refresh').
-
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
@@ -24,7 +17,7 @@
-callback active_for(rabbit_types:amqqueue()) -> boolean().
--callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+-callback active_consumers_changed(rabbit_types:amqqueue(), any()) -> 'ok'.
-else.
@@ -32,7 +25,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
- {active_for, 1}, {notify, 3}];
+ {active_for, 1}, {active_consumers_changed, 2}];
behaviour_info(_Other) ->
undefined.