diff options
authorAlexandru Scvortov <>2011-09-15 13:36:52 +0100
committerAlexandru Scvortov <>2011-09-15 13:36:52 +0100
commit8477b66ab395918475b66ae9f756d45f41cc2548 (patch)
parent8acdda1f848597d9a403e02aaffbfb270e0767bf (diff)
monitor_queue/demonitor_queue are symmetric; don't use process dictionary
1 files changed, 149 insertions, 126 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 684c4206..2948067a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,8 @@
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ unconfirmed_qm, confirmed, capabilities, trace_state,
+ queue_monitors}).
@@ -200,7 +201,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unconfirmed_qm = gb_trees:empty(),
confirmed = [],
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost)},
+ trace_state = rabbit_trace:init(VHost),
+ queue_monitors = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
fun() -> emit_stats(State) end),
@@ -299,13 +301,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange =,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
+ noreply(State3#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -524,9 +526,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
false -> ok
- State1 = State#ch{blocking = Blocking1},
- demonitor_queue(QPid, State1),
- State1
+ demonitor_queue(QPid, State#ch{blocking = Blocking1})
record_confirm(undefined, _, State) ->
@@ -545,41 +545,43 @@ confirm(MsgSeqNos, QPid, State) ->
{MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
- {MXs, UMQ1, UQM1} =
- lists:foldl(
- fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack, State);
- none -> Acc
- end
- end, {[], UMQ, UQM}, MsgSeqNos),
- {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) ->
- UQM1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> UQM2 = gb_trees:delete(QPid, UQM),
- demonitor_queue(QPid,
- State#ch{unconfirmed_qm = UQM2}),
- UQM2;
- false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
- end;
- none ->
- end,
+process_confirms(MsgSeqNos, QPid, Nack, State) ->
+ lists:foldl(
+ fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack);
+ none -> Acc
+ end
+ end, {[], State}, MsgSeqNos).
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
+ {MXs, State = #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM}},
+ Nack) ->
+ State1 =
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> demonitor_queue(
+ QPid, State#ch{unconfirmed_qm =
+ gb_trees:delete(QPid, UQM)});
+ false -> State#ch{unconfirmed_qm =
+ gb_trees:update(QPid, MsgSeqNos1, UQM)}
+ end;
+ none ->
+ State
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
true ->
- {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ {[{MsgSeqNo, XName} | MXs],
+ State1#ch{unconfirmed_mq = gb_trees:delete(MsgSeqNo, UMQ)}};
false ->
- {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
+ {MXs, State1#ch{unconfirmed_mq =
+ gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ)}}
handle_method(#''{}, _, State = #ch{state = starting}) ->
@@ -696,11 +698,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
@@ -710,7 +712,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State3#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -769,7 +771,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping,
+ _, State = #ch{consumer_mapping = ConsumerMapping,
consumer_monitors = ConsumerMonitors}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
@@ -777,15 +779,18 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
{ok, Q = #amqqueue{pid = QPid}} ->
- ConsumerMonitors1 = case get({monitoring, QPid}) of
- undefined -> ConsumerMonitors;
- _ -> gb_sets:delete(QPid,
- ConsumerMonitors)
- end,
- NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
- ConsumerMapping),
- consumer_monitors = ConsumerMonitors1},
- ok = demonitor_queue(Q, NewState),
+ ConsumerMapping1 = dict:erase(ConsumerTag,ConsumerMapping),
+ ConsumerMonitors1 =
+ case dict:size(dict:filter(fun (_, #amqqueue{pid = QPid0})
+ when QPid0 =:= QPid -> true;
+ (_, _) -> false
+ end, ConsumerMapping1)) of
+ 0 -> gb_sets:delete(QPid, ConsumerMonitors);
+ _ -> ConsumerMonitors
+ end,
+ State1 = State#ch{consumer_mapping = ConsumerMapping1,
+ consumer_monitors = ConsumerMonitors1},
+ State2 = demonitor_queue(Q, State1),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -800,10 +805,10 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
consumer_tag = ConsumerTag}))
end) of
ok ->
- {noreply, NewState};
+ {noreply, State2};
{error, not_found} ->
%% Spec requires we ignore this situation.
- return_ok(NewState, NoWait, OkMsg)
+ return_ok(State2, NoWait, OkMsg)
@@ -1109,10 +1114,15 @@ handle_method(#'channel.flow'{active = false}, _,
State1 = State#ch{limiter = Limiter1},
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
- [] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [begin monitor_queue(QPid), QPid end || QPid <- QPids],
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = gb_sets:from_list(Queues)}}
+ [] ->
+ {reply, #'channel.flow_ok'{active = false}, State1};
+ QPids ->
+ State2 = lists:foldl(fun (QPid, State0) -> monitor_queue(QPid, State0)
+ end,
+ State1#ch{blocking = gb_sets:from_list(QPids)},
+ QPids),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, State2}
handle_method(_MethodRecord, _Content, _State) ->
@@ -1121,12 +1131,17 @@ handle_method(_MethodRecord, _Content, _State) ->
-monitor_queue(QPid) ->
- case get({monitoring, QPid}) of
- undefined -> MRef = erlang:monitor(process, QPid),
- put({monitoring, QPid}, MRef),
- MRef;
- MRef -> MRef
+monitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) ->
+ case dict:is_key(QPid, QueueMonitors) of
+ false -> case queue_monitor_needed(QPid, State) of
+ false -> State;
+ true -> QueueMonitors1 = dict:store(QPid,
+ erlang:monitor(process,
+ QPid),
+ QueueMonitors),
+ State#ch{queue_monitors = QueueMonitors1}
+ end;
+ true -> State
@@ -1137,31 +1152,34 @@ consumer_monitor(ConsumerTag,
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
#amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- monitor_queue(QPid),
- State#ch{consumer_monitors = gb_sets:add(QPid, ConsumerMonitors)};
+ monitor_queue(QPid,
+ State#ch{consumer_monitors =
+ gb_sets:add(QPid, ConsumerMonitors)});
_ ->
-demonitor_queue(QPid, #ch{stats_timer = StatsTimer,
- consumer_monitors = ConsumerMonitors,
- blocking = Blocking,
- unconfirmed_qm = UQM}) ->
- case get({monitoring, QPid}) of
- undefined -> ok;
- MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
- ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors),
- QueueBlocked = gb_sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- case StatsEnabled or ConsumerMonitored or
- QueueBlocked or ConfirmMonitored of
- false -> true = erlang:demonitor(MRef),
- erase({monitoring, QPid});
- true -> ok
- end,
- ok
+demonitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) ->
+ case dict:find(QPid, QueueMonitors) of
+ error -> State;
+ {ok, MRef} -> case queue_monitor_needed(QPid, State) of
+ false -> true = erlang:demonitor(MRef),
+ State#ch{queue_monitors =
+ dict:erase(QPid, QueueMonitors)};
+ true -> State
+ end
+queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
+ consumer_monitors = ConsumerMonitors,
+ blocking = Blocking,
+ unconfirmed_qm = UQM}) ->
+ StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors),
+ QueueBlocked = gb_sets:is_element(QPid, Blocking),
+ ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
+ StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1304,9 +1322,8 @@ ack(Acked, State) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
[{QPid, length(MsgIds)} | L]
end, [], Acked),
- maybe_incr_stats(QIncs, ack, State),
ok = notify_limiter(State#ch.limiter, Acked),
- State.
+ maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
uncommitted_ack_q = queue:new()}.
@@ -1366,38 +1383,39 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
XName, MsgSeqNo, Message, State),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1),
- State1.
+ QPid <- DeliveredQPids]], publish, State1).
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
+ State1 = maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State1);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
+ State1 = maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State1);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ #ch{unconfirmed_mq = UMQ} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- UQM1 = lists:foldl(
- fun (QPid, UQM2) ->
- case gb_trees:lookup(QPid, UQM2) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- gb_trees:update(QPid, MsgSeqNos1, UQM2);
- none ->
- monitor_queue(QPid),
- gb_trees:insert(QPid, SingletonSet, UQM2)
- end
- end, UQM, QPids),
- State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
+ lists:foldl(
+ fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ State0#ch{unconfirmed_qm =
+ gb_trees:update(QPid, MsgSeqNos1, UQM)};
+ none ->
+ State1 =
+ State0#ch{unconfirmed_qm =
+ gb_trees:insert(QPid, SingletonSet, UQM)},
+ monitor_queue(QPid, State1)
+ end
+ end, State#ch{unconfirmed_mq = UMQ1}, QPids).
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1418,10 +1436,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
C1 = lists:append(C),
- MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
- MsgSeqNo
- end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []});
+ {MsgSeqNos, State1} =
+ lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
+ {[MsgSeqNo | MSNs],
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State0)}
+ end, {[], State}, C1),
+ send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
send_confirms(State) ->
@@ -1501,23 +1521,26 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, _) ->
- ok.
+maybe_incr_redeliver_stats(_, _, State) ->
+ State.
-maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
- fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
- _ -> ok
+ fine -> lists:foldl(fun ({QX, Inc}, State0) ->
+ incr_stats(QX, Inc, Measure, State0)
+ end, State, QXIncs);
+ _ -> State
-incr_stats({QPid, _} = QX, Inc, Measure) ->
- monitor_queue(QPid),
- update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- monitor_queue(QPid),
- update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
+incr_stats({QPid, _} = QX, Inc, Measure, State) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(X, Inc, Measure, State) ->
+ update_measures(exchange_stats, X, Inc, Measure),
+ State.
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of