From 0798dfad95793219c5ee53b81decbc451252e636 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 1 Aug 2011 15:01:06 +0100 Subject: Introduce rabbit:force_event_refresh which will: * Emit a foo_exists for every foo that can emit foo_created * As a side effect, wake everything up and thus send out foo_stats events too. Currently this doesn't work for direct connections (rather more fiddly), but it does for everything else. --- src/rabbit_amqqueue_process.erl | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'src/rabbit_amqqueue_process.erl') diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6019413..c7e36283 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -767,7 +767,15 @@ emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> - rabbit_event:notify(consumer_created, + emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, + consumer_created). + +emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) -> + emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, + consumer_exists). + +emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) -> + rabbit_event:notify(Type, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, @@ -1118,7 +1126,18 @@ handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> emit_stats(State), State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, assert_invariant(State1), - {noreply, State1, hibernate}. + {noreply, State1, hibernate}; + +handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + _ -> [emit_consumer_exists(Ch, CTag, true, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State), + Exclusive = {Ch, CTag}] + end, + noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> -- cgit v1.2.1