summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-12-03 15:31:13 +0000
committerSimon MacMullen <simon@rabbitmq.com>2010-12-03 15:31:13 +0000
commit069061cdb87ff723e2685461100ef36fb0ab523d (patch)
tree324181d3668be395c4643418149f6f50fa8a1213
parentc7a3d3c7593b5cd80d41f8763263e43acf24a54c (diff)
downloadrabbitmq-server-069061cdb87ff723e2685461100ef36fb0ab523d.tar.gz
Add consumer created / deleted events.
-rw-r--r--src/rabbit_amqqueue_process.erl49
1 files changed, 38 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 25859c22..9f43a62b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -203,6 +203,8 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ [emit_consumer_deleted(CTag, Ch)
+ || {CTag, Ch, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -540,12 +542,18 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
+ {Kept, Removed} = split_by_channel(ChPid, Queue),
+ [emit_consumer_deleted(CTag, Ch) || {Ch, #consumer{tag = CTag}} <- Removed],
+ Kept.
move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = split_by_channel(ChPid, From),
+ {Kept, queue:join(To, Removed)}.
+
+split_by_channel(ChPid, Queue) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ queue:to_list(Queue)),
+ {queue:from_list(Kept), queue:from_list(Removed)}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -728,12 +736,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+consumers(#q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, ConsumerTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel_pid, ChPid},
+ {queue_pid, self()}]).
+
+emit_consumer_deleted(ChPid, ConsumerTag) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel_pid, ChPid},
+ {queue_pid, self()}]).
+
%---------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
@@ -796,14 +826,8 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From,
- State = #q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- reply(rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+handle_call(consumers, _From, State) ->
+ reply(consumers(State), State);
handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
_From, State) ->
@@ -906,6 +930,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
+ emit_consumer_created(ConsumerTag, ChPid, ExclusiveConsume,
+ not NoAck),
reply(ok, State2)
end;
@@ -924,6 +950,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C1#cr{limiter_pid = undefined};
_ -> C1
end),
+ emit_consumer_deleted(ConsumerTag, ChPid),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,