diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 13:36:52 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-15 13:36:52 +0100 |
commit | 8477b66ab395918475b66ae9f756d45f41cc2548 (patch) | |
tree | 75f126f36d49cdfe834aee70aa1f12e294c24e12 | |
parent | 8acdda1f848597d9a403e02aaffbfb270e0767bf (diff) | |
download | rabbitmq-server-8477b66ab395918475b66ae9f756d45f41cc2548.tar.gz |
monitor_queue/demonitor_queue are symmetric; don't use process dictionary
-rw-r--r-- | src/rabbit_channel.erl | 275 |
1 files changed, 149 insertions, 126 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 684c4206..2948067a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,8 @@ user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + unconfirmed_qm, confirmed, capabilities, trace_state, + queue_monitors}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -200,7 +201,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unconfirmed_qm = gb_trees:empty(), confirmed = [], capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost)}, + trace_state = rabbit_trace:init(VHost), + queue_monitors = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> emit_stats(State) end), @@ -299,13 +301,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State), - maybe_incr_redeliver_stats(Redelivered, QPid, State), + State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State1#ch{next_tag = DeliveryTag + 1}); + noreply(State3#ch{next_tag = DeliveryTag + 1}); handle_cast(force_event_refresh, State) -> @@ -524,9 +526,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> #'channel.flow_ok'{active = false}); false -> ok end, - State1 = State#ch{blocking = Blocking1}, - demonitor_queue(QPid, State1), - State1 + demonitor_queue(QPid, State#ch{blocking = Blocking1}) end. record_confirm(undefined, _, State) -> @@ -545,41 +545,43 @@ confirm(MsgSeqNos, QPid, State) -> {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), record_confirms(MXs, State1). -process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}) -> - {MXs, UMQ1, UQM1} = - lists:foldl( - fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack, State); - none -> Acc - end - end, {[], UMQ, UQM}, MsgSeqNos), - {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack, State) -> - UQM1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> UQM2 = gb_trees:delete(QPid, UQM), - demonitor_queue(QPid, - State#ch{unconfirmed_qm = UQM2}), - UQM2; - false -> gb_trees:update(QPid, MsgSeqNos1, UQM) - end; - none -> - UQM - end, +process_confirms(MsgSeqNos, QPid, Nack, State) -> + lists:foldl( + fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, + Acc, Nack); + none -> Acc + end + end, {[], State}, MsgSeqNos). + +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, + {MXs, State = #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM}}, + Nack) -> + State1 = + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> demonitor_queue( + QPid, State#ch{unconfirmed_qm = + gb_trees:delete(QPid, UQM)}); + false -> State#ch{unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)} + end; + none -> + State + end, Qs1 = gb_sets:del_element(QPid, Qs), %% If QPid somehow died initiating a nack, clear the message from %% internal data-structures. Also, cleanup empty entries. case (Nack orelse gb_sets:is_empty(Qs1)) of true -> - {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + {[{MsgSeqNo, XName} | MXs], + State1#ch{unconfirmed_mq = gb_trees:delete(MsgSeqNo, UMQ)}}; false -> - {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} + {MXs, State1#ch{unconfirmed_mq = + gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ)}} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -696,11 +698,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State), - maybe_incr_redeliver_stats(Redelivered, QPid, State), + State2 = maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State1), + State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -710,7 +712,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, State3#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -769,7 +771,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping, + _, State = #ch{consumer_mapping = ConsumerMapping, consumer_monitors = ConsumerMonitors}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of @@ -777,15 +779,18 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); {ok, Q = #amqqueue{pid = QPid}} -> - ConsumerMonitors1 = case get({monitoring, QPid}) of - undefined -> ConsumerMonitors; - _ -> gb_sets:delete(QPid, - ConsumerMonitors) - end, - NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, - ConsumerMapping), - consumer_monitors = ConsumerMonitors1}, - ok = demonitor_queue(Q, NewState), + ConsumerMapping1 = dict:erase(ConsumerTag,ConsumerMapping), + ConsumerMonitors1 = + case dict:size(dict:filter(fun (_, #amqqueue{pid = QPid0}) + when QPid0 =:= QPid -> true; + (_, _) -> false + end, ConsumerMapping1)) of + 0 -> gb_sets:delete(QPid, ConsumerMonitors); + _ -> ConsumerMonitors + end, + State1 = State#ch{consumer_mapping = ConsumerMapping1, + consumer_monitors = ConsumerMonitors1}, + State2 = demonitor_queue(Q, State1), %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -800,10 +805,10 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, consumer_tag = ConsumerTag})) end) of ok -> - {noreply, NewState}; + {noreply, State2}; {error, not_found} -> %% Spec requires we ignore this situation. - return_ok(NewState, NoWait, OkMsg) + return_ok(State2, NoWait, OkMsg) end end; @@ -1109,10 +1114,15 @@ handle_method(#'channel.flow'{active = false}, _, State1 = State#ch{limiter = Limiter1}, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of - [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> Queues = [begin monitor_queue(QPid), QPid end || QPid <- QPids], - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State1#ch{blocking = gb_sets:from_list(Queues)}} + [] -> + {reply, #'channel.flow_ok'{active = false}, State1}; + QPids -> + State2 = lists:foldl(fun (QPid, State0) -> monitor_queue(QPid, State0) + end, + State1#ch{blocking = gb_sets:from_list(QPids)}, + QPids), + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, State2} end; handle_method(_MethodRecord, _Content, _State) -> @@ -1121,12 +1131,17 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -monitor_queue(QPid) -> - case get({monitoring, QPid}) of - undefined -> MRef = erlang:monitor(process, QPid), - put({monitoring, QPid}, MRef), - MRef; - MRef -> MRef +monitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) -> + case dict:is_key(QPid, QueueMonitors) of + false -> case queue_monitor_needed(QPid, State) of + false -> State; + true -> QueueMonitors1 = dict:store(QPid, + erlang:monitor(process, + QPid), + QueueMonitors), + State#ch{queue_monitors = QueueMonitors1} + end; + true -> State end. consumer_monitor(ConsumerTag, @@ -1137,31 +1152,34 @@ consumer_monitor(ConsumerTag, Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - monitor_queue(QPid), - State#ch{consumer_monitors = gb_sets:add(QPid, ConsumerMonitors)}; + monitor_queue(QPid, + State#ch{consumer_monitors = + gb_sets:add(QPid, ConsumerMonitors)}); _ -> State end. -demonitor_queue(QPid, #ch{stats_timer = StatsTimer, - consumer_monitors = ConsumerMonitors, - blocking = Blocking, - unconfirmed_qm = UQM}) -> - case get({monitoring, QPid}) of - undefined -> ok; - MRef -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, - ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors), - QueueBlocked = gb_sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), - case StatsEnabled or ConsumerMonitored or - QueueBlocked or ConfirmMonitored of - false -> true = erlang:demonitor(MRef), - erase({monitoring, QPid}); - true -> ok - end, - ok +demonitor_queue(QPid, State = #ch{queue_monitors = QueueMonitors}) -> + case dict:find(QPid, QueueMonitors) of + error -> State; + {ok, MRef} -> case queue_monitor_needed(QPid, State) of + false -> true = erlang:demonitor(MRef), + State#ch{queue_monitors = + dict:erase(QPid, QueueMonitors)}; + true -> State + end end. +queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, + consumer_monitors = ConsumerMonitors, + blocking = Blocking, + unconfirmed_qm = UQM}) -> + StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors), + QueueBlocked = gb_sets:is_element(QPid, Blocking), + ConfirmMonitored = gb_trees:is_defined(QPid, UQM), + StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1304,9 +1322,8 @@ ack(Acked, State) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), [{QPid, length(MsgIds)} | L] end, [], Acked), - maybe_incr_stats(QIncs, ack, State), ok = notify_limiter(State#ch.limiter, Acked), - State. + maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), uncommitted_ack_q = queue:new()}. @@ -1366,38 +1383,39 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ XName, MsgSeqNo, Message, State), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1), - State1. + QPid <- DeliveredQPids]], publish, State1). process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State), - record_confirm(MsgSeqNo, XName, State); + State1 = maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), + record_confirm(MsgSeqNo, XName, State1); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State); + State1 = maybe_incr_stats([{XName, 1}], return_not_delivered, State), + record_confirm(MsgSeqNo, XName, State1); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + #ch{unconfirmed_mq = UMQ} = State, UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), SingletonSet = gb_sets:singleton(MsgSeqNo), - UQM1 = lists:foldl( - fun (QPid, UQM2) -> - case gb_trees:lookup(QPid, UQM2) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - gb_trees:update(QPid, MsgSeqNos1, UQM2); - none -> - monitor_queue(QPid), - gb_trees:insert(QPid, SingletonSet, UQM2) - end - end, UQM, QPids), - State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. + lists:foldl( + fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + State0#ch{unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)}; + none -> + State1 = + State0#ch{unconfirmed_qm = + gb_trees:insert(QPid, SingletonSet, UQM)}, + monitor_queue(QPid, State1) + end + end, State#ch{unconfirmed_mq = UMQ1}, QPids). lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1418,10 +1436,12 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx_status = none, confirmed = C}) -> C1 = lists:append(C), - MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), - MsgSeqNo - end || {MsgSeqNo, ExchangeName} <- C1 ], - send_confirms(MsgSeqNos, State #ch{confirmed = []}); + {MsgSeqNos, State1} = + lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> + {[MsgSeqNo | MSNs], + maybe_incr_stats([{ExchangeName, 1}], confirm, State0)} + end, {[], State}, C1), + send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1501,23 +1521,26 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, _) -> - ok. +maybe_incr_redeliver_stats(_, _, State) -> + State. -maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> +maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of - fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; - _ -> ok + fine -> lists:foldl(fun ({QX, Inc}, State0) -> + incr_stats(QX, Inc, Measure, State0) + end, State, QXIncs); + _ -> State end. -incr_stats({QPid, _} = QX, Inc, Measure) -> - monitor_queue(QPid), - update_measures(queue_exchange_stats, QX, Inc, Measure); -incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> - monitor_queue(QPid), - update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(X, Inc, Measure) -> - update_measures(exchange_stats, X, Inc, Measure). +incr_stats({QPid, _} = QX, Inc, Measure, State) -> + update_measures(queue_exchange_stats, QX, Inc, Measure), + monitor_queue(QPid, State); +incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure), + monitor_queue(QPid, State); +incr_stats(X, Inc, Measure, State) -> + update_measures(exchange_stats, X, Inc, Measure), + State. update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of |