diff options
authorAlexandru Scvortov <>2011-09-14 21:28:10 +0100
committerAlexandru Scvortov <>2011-09-14 21:28:10 +0100
commite9abddfc8550f335f77d74377c9f364b33393bc7 (patch)
parentbb416385996c9cfcb5bd7c7c017c13d366703e5a (diff)
parente33c9c1550898dacd6fc97bd3ae9c55a455dac92 (diff)
merge from default + cosmetic
1 files changed, 72 insertions, 55 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dfe84644..a0d7fe79 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new(),
+ blocking = gb_sets:new(),
consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
@@ -275,7 +275,7 @@ handle_cast(terminate, State) ->
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(monitor_consumer(ConsumerTag, State));
+ noreply(consumer_monitor(ConsumerTag, State));
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -325,12 +325,12 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
handle_info({'DOWN', MRef, process, QPid, Reason},
State = #ch{consumer_monitors = ConsumerMonitors}) ->
+ State1 = handle_publishing_queue_down(QPid, Reason, State),
case dict:find(MRef, ConsumerMonitors) of
- error ->
- handle_publishing_queue_down(QPid, Reason, State);
- {ok, ConsumerTag} ->
- handle_consuming_queue_down(MRef, ConsumerTag, State)
+ error -> State1;
+ {ok, ConsumerTag} -> handle_consuming_queue_down(MRef, ConsumerTag,
+ State1)
handle_info({'EXIT', _Pid, Reason}, State) ->
@@ -516,17 +516,18 @@ check_name(_Kind, NameBin) ->
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case dict:find(QPid, Blocking) of
- error -> State;
- {ok, MRef} -> true = erlang:demonitor(MRef),
- Blocking1 = dict:erase(QPid, Blocking),
- ok = case dict:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ case gb_sets:is_element(QPid, Blocking) of
+ false -> State;
+ true -> Blocking1 = gb_sets:delete(QPid, Blocking),
+ ok = case gb_sets:is_empty(Blocking1) of
+ true -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ false -> ok
+ end,
+ State1 = State#ch{blocking = Blocking1},
+ demonitor_queue(QPid, State1),
+ State1
record_confirm(undefined, _, State) ->
@@ -552,18 +553,21 @@ process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
{value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
+ Acc, Nack, State);
none -> Acc
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) ->
UQM1 = case gb_trees:lookup(QPid, UQM) of
{value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
- true -> gb_trees:delete(QPid, UQM);
+ true -> UQM2 = gb_trees:delete(QPid, UQM),
+ demonitor_queue(QPid,
+ State#ch{unconfirmed_qm = UQM2}),
+ UQM2;
false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
none ->
@@ -746,12 +750,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q} ->
State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- {Q, undefined},
+ dict:store(ActualConsumerTag, Q,
case NoWait of
- true -> monitor_consumer(ActualConsumerTag, State1);
+ true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
{{error, exclusive_consume_unavailable}, _Q} ->
@@ -774,16 +777,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {Q, MRef}} ->
- ConsumerMonitors1 =
- case MRef of
- undefined -> ConsumerMonitors;
- _ -> true = erlang:demonitor(MRef),
- dict:erase(MRef, ConsumerMonitors)
- end,
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ ConsumerMonitors1 = case get({monitoring, QPid}) of
+ undefined -> ConsumerMonitors;
+ MRef -> dict:erase(MRef, ConsumerMonitors)
+ end,
NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
consumer_monitors = ConsumerMonitors1},
+ ok = demonitor_queue(Q, NewState),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1108,10 +1110,9 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
- QPid <- QPids],
+ QPids -> Queues = [begin monitor_queue(QPid), QPid end || QPid <- QPids],
ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ {noreply, State1#ch{blocking = gb_sets:from_list(Queues)}}
handle_method(_MethodRecord, _Content, _State) ->
@@ -1120,25 +1121,49 @@ handle_method(_MethodRecord, _Content, _State) ->
-monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+monitor_queue(QPid) ->
+ case get({monitoring, QPid}) of
+ undefined -> MRef = erlang:monitor(process, QPid),
+ put({monitoring, QPid}, MRef),
+ MRef;
+ MRef -> MRef
+ end.
+ State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
- {#amqqueue{pid = QPid} = Q, undefined} =
- dict:fetch(ConsumerTag, ConsumerMapping),
- MRef = erlang:monitor(process, QPid),
- State#ch{consumer_mapping =
- dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
- consumer_monitors =
+ #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
+ MRef = monitor_queue(QPid),
+ State#ch{consumer_monitors =
dict:store(MRef, ConsumerTag, ConsumerMonitors)};
_ ->
+demonitor_queue(QPid, #ch{stats_timer = StatsTimer,
+ consumer_monitors = ConsumerMonitors,
+ blocking = Blocking}) ->
+ case get({monitoring, QPid}) of
+ undefined -> ok;
+ MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored =
+ dict:find(MRef, ConsumerMonitors) =/= error,
+ QueueBlocked = gb_sets:is_element(QPid, Blocking),
+ case not StatsEnabled and not ConsumerMonitored and
+ not QueueBlocked of
+ true -> true = erlang:demonitor(MRef),
+ erase({monitoring, QPid});
+ false -> ok
+ end,
+ ok
+ end.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
none -> []
@@ -1307,8 +1332,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
consumer_queues(Consumers) ->
lists:usort([QPid ||
- {_Key, {#amqqueue{pid = QPid}, _MRef}}
- <- dict:to_list(Consumers)]).
+ {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -1356,12 +1380,12 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
SingletonSet = gb_sets:singleton(MsgSeqNo),
UQM1 = lists:foldl(
fun (QPid, UQM2) ->
- maybe_monitor(QPid),
case gb_trees:lookup(QPid, UQM2) of
{value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
gb_trees:update(QPid, MsgSeqNos1, UQM2);
none ->
+ monitor_queue(QPid),
gb_trees:insert(QPid, SingletonSet, UQM2)
end, UQM, QPids),
@@ -1479,21 +1503,14 @@ maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
incr_stats({QPid, _} = QX, Inc, Measure) ->
- maybe_monitor(QPid),
+ monitor_queue(QPid),
update_measures(queue_exchange_stats, QX, Inc, Measure);
incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- maybe_monitor(QPid),
+ monitor_queue(QPid),
update_measures(queue_stats, QPid, Inc, Measure);
incr_stats(X, Inc, Measure) ->
update_measures(exchange_stats, X, Inc, Measure).
-maybe_monitor(QPid) ->
- case get({monitoring, QPid}) of
- undefined -> erlang:monitor(process, QPid),
- put({monitoring, QPid}, true);
- _ -> ok
- end.
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
undefined -> [];