diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-24 14:28:55 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-24 14:28:55 +0100 |
commit | b214a650f31cfeb2d96b6a68541f98ebe68547d2 (patch) | |
tree | e995d6aed4b762ad404b5ec1402d65dc3af975c2 | |
parent | 31675d9fc8b6947b88441580e0973ed45d76d715 (diff) | |
download | rabbitmq-server-b214a650f31cfeb2d96b6a68541f98ebe68547d2.tar.gz |
More symmetry; notify on basic consume and consumer unblock as well as basic.cancel and consumer block.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1244d640..eeebae3f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -695,7 +695,11 @@ unblock(State, C = #cr{limiter = Limiter}) -> UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - run_message_queue(State#q{active_consumers = AC1}) + State1 = State#q{active_consumers = AC1}, + [notify_decorators( + consumer_unblocked, [{consumer_tag, CTag}], State1) || + {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], + run_message_queue(State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -1201,7 +1205,10 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, AC1 = priority_queue:in({ChPid, Consumer}, consumer_priority({ChPid, Consumer}), State1#q.active_consumers), - reply(ok, run_message_queue(State1#q{active_consumers = AC1})) + State2 = State1#q{active_consumers = AC1}, + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State2), + reply(ok, run_message_queue(State2)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, |