diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-12-03 15:31:13 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-12-03 15:31:13 +0000 |
commit | 069061cdb87ff723e2685461100ef36fb0ab523d (patch) | |
tree | 324181d3668be395c4643418149f6f50fa8a1213 /src/rabbit_amqqueue_process.erl | |
parent | c7a3d3c7593b5cd80d41f8763263e43acf24a54c (diff) | |
download | rabbitmq-server-069061cdb87ff723e2685461100ef36fb0ab523d.tar.gz |
Add consumer created / deleted events.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 |
1 files changed, 38 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 25859c22..9f43a62b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -203,6 +203,8 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + [emit_consumer_deleted(CTag, Ch) + || {CTag, Ch, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -540,12 +542,18 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). + {Kept, Removed} = split_by_channel(ChPid, Queue), + [emit_consumer_deleted(CTag, Ch) || {Ch, #consumer{tag = CTag}} <- Removed], + Kept. move_consumers(ChPid, From, To) -> + {Kept, Removed} = split_by_channel(ChPid, From), + {Kept, queue:join(To, Removed)}. + +split_by_channel(ChPid, Queue) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + queue:to_list(Queue)), + {queue:from_list(Kept), queue:from_list(Removed)}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -728,12 +736,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +consumers(#q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). + emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel_pid, ChPid}, + {queue_pid, self()}]). + +emit_consumer_deleted(ChPid, ConsumerTag) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel_pid, ChPid}, + {queue_pid, self()}]). + %--------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> @@ -796,14 +826,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, - State = #q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - reply(rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); +handle_call(consumers, _From, State) -> + reply(consumers(State), State); handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, _From, State) -> @@ -906,6 +930,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, + emit_consumer_created(ConsumerTag, ChPid, ExclusiveConsume, + not NoAck), reply(ok, State2) end; @@ -924,6 +950,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C1#cr{limiter_pid = undefined}; _ -> C1 end), + emit_consumer_deleted(ConsumerTag, ChPid), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, |