summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-08-05 17:15:08 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-08-05 17:15:08 +0100
commitb73d562111cd764097cde55ff761e3c2a2d82ec0 (patch)
tree3108af27d17c0f07ab7c0ace5370eecf48f67f0e /src/rabbit_amqqueue_process.erl
parent51c8e02239abcb2f8a9a80db8ed2385e0ce67d10 (diff)
parentb18de45aaea54480827f593e1caced01c89d7e73 (diff)
downloadrabbitmq-server-b73d562111cd764097cde55ff761e3c2a2d82ec0.tar.gz
Merge in default
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl21
1 files changed, 20 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 05de48d6..e1fa7006 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -761,7 +761,15 @@ emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
- rabbit_event:notify(consumer_created,
+ emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
+ consumer_created).
+
+emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
+ consumer_exists).
+
+emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) ->
+ rabbit_event:notify(Type,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
@@ -1085,6 +1093,17 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)),
+ case Exclusive of
+ none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State)];
+ _ -> [emit_consumer_exists(Ch, CTag, true, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State),
+ Exclusive = {Ch, CTag}]
+ end,
noreply(State).
handle_info(maybe_expire, State) ->