summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-24 14:28:55 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-24 14:28:55 +0100
commitb214a650f31cfeb2d96b6a68541f98ebe68547d2 (patch)
treee995d6aed4b762ad404b5ec1402d65dc3af975c2
parent31675d9fc8b6947b88441580e0973ed45d76d715 (diff)
downloadrabbitmq-server-b214a650f31cfeb2d96b6a68541f98ebe68547d2.tar.gz
More symmetry; notify on basic consume and consumer unblock as well as basic.cancel and consumer block.
-rw-r--r--src/rabbit_amqqueue_process.erl11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1244d640..eeebae3f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -695,7 +695,11 @@ unblock(State, C = #cr{limiter = Limiter}) ->
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- run_message_queue(State#q{active_consumers = AC1})
+ State1 = State#q{active_consumers = AC1},
+ [notify_decorators(
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1201,7 +1205,10 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
AC1 = priority_queue:in({ChPid, Consumer},
consumer_priority({ChPid, Consumer}),
State1#q.active_consumers),
- reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
+ State2 = State1#q{active_consumers = AC1},
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State2),
+ reply(ok, run_message_queue(State2))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,