diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/pmon.erl | 64 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 115 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 25 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 21 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 53 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 36 |
7 files changed, 174 insertions, 147 deletions
diff --git a/src/pmon.erl b/src/pmon.erl new file mode 100644 index 00000000..45786577 --- /dev/null +++ b/src/pmon.erl @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% + +-module(pmon). + +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, + monitored/1, is_empty/1]). + +-ifdef(use_specs). + +%%---------------------------------------------------------------------------- + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: dict()). + +-spec(new/0 :: () -> ?MODULE()). +-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()). +-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitored/1 :: (?MODULE()) -> [pid()]). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). + +-endif. + +new() -> dict:new(). + +monitor(Pid, M) -> + case dict:is_key(Pid, M) of + true -> M; + false -> dict:store(Pid, erlang:monitor(process, Pid), M) + end. + +monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids). + +demonitor(Pid, M) -> + case dict:find(Pid, M) of + {ok, MRef} -> erlang:demonitor(MRef), + dict:erase(Pid, M); + error -> M + end. + +is_monitored(Pid, M) -> dict:is_key(Pid, M). + +erase(Pid, M) -> dict:erase(Pid, M). + +monitored(M) -> dict:fetch_keys(M). + +is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 75091692..c1673504 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_immediately/1 :: (qpids()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -468,8 +468,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -delete_immediately(#amqqueue{ pid = QPid }) -> - gen_server2:cast(QPid, delete_immediately). +delete_immediately(QPids) -> + [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], + ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3bd13df1..3caf728b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/8]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + senders, publish_seqno, unconfirmed, delayed_stop, @@ -74,9 +75,9 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/7 :: +-spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], dict()) -> #q{}). + [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -131,18 +132,19 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, MTC) -> + RateTRef, AckTags, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -605,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - case get({ch_publisher, DownPid}) of - undefined -> ok; - MRef -> erlang:demonitor(MRef), - erase({ch_publisher, DownPid}), - credit_flow:peer_down(DownPid) - end, +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + senders = Senders}) -> + Senders1 = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end, case lookup_ch(DownPid) of not_found -> - {ok, State}; + {ok, State#q{senders = Senders1}}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> @@ -626,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers)}, + ChPid, State#q.active_consumers), + senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -736,47 +740,32 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, rabbit_basic:delivery( false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), - State1 = lists:foldl(fun monitor_queue/2, State, QPids), - State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + State1 = State#q{queue_monitors = pmon:monitor_all( + QPids, State#q.queue_monitors), + publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> cleanup_after_confirm([AckTag], State2); + [] -> cleanup_after_confirm([AckTag], State1); _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), - noreply(State2#q{unconfirmed = UC1}) - end. - -monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:is_key(QPid, QMons) of - true -> State; - false -> State#q{queue_monitors = - dict:store(QPid, erlang:monitor(process, QPid), - QMons)} - end. - -demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:find(QPid, QMons) of - {ok, MRef} -> erlang:demonitor(MRef), - State#q{queue_monitors = dict:erase(QPid, QMons)}; - error -> State + noreply(State1#q{unconfirmed = UC1}) end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> - case dict:find(QPid, QMons) of - error -> - noreply(State); - {ok, _} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - rabbit_log:warning( - "DLQ ~p for ~s died with ~p unconfirmed messages~n", - [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = dict:erase(QPid, QMons), - unconfirmed = UC1}) + case pmon:is_monitored(QPid, QMons) of + false -> noreply(State); + true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + QNameS = rabbit_misc:rs(qname(State)), + rabbit_log:warning("DLQ ~p for ~s died with " + "~p unconfirmed messages~n", + [QPid, QNameS, length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = pmon:erase(QPid, QMons), + unconfirmed = UC1}) end. stop_later(Reason, State) -> @@ -1194,7 +1183,8 @@ handle_call(force_event_refresh, _From, handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), State1 = case dtree:is_defined(QPid, UC1) of - false -> demonitor_queue(QPid, State); + false -> QMons = State#q.queue_monitors, + State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; true -> State end, cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], @@ -1208,22 +1198,19 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender, msg_seq_no = MsgSeqNo}, Flow}, - State) -> + State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - case already_been_here(Delivery, State) of - false -> noreply(deliver_or_enqueue(Delivery, State)); + Senders1 = case Flow of + flow -> credit_flow:ack(Sender), + pmon:monitor(Sender, Senders); + noflow -> Senders + end, + State1 = State#q{senders = Senders1}, + case already_been_here(Delivery, State1) of + false -> noreply(deliver_or_enqueue(Delivery, State1)); Qs -> log_cycle_once(Qs), rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) + noreply(State1) end; handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0e4f3693..846890a1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = sets:new(), + queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), credit_flow:peer_down(QPid), erase_queue_stats(QPid), - noreply(State3#ch{queue_monitors = - sets:del_element(QPid, State3#ch.queue_monitors)}); + noreply(State3#ch{queue_monitors = pmon:erase( + QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -935,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of - {new, Q = #amqqueue{}} -> + {new, #amqqueue{pid = QPid}} -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as @@ -943,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, ok = case Owner of none -> ok; _ -> rabbit_queue_collector:register( - CollectorPid, Q) + CollectorPid, QPid) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> @@ -1089,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) -> consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + queue_monitors = QMons, queue_consumers = QCons, capabilities = Capabilities}) -> case rabbit_misc:table_lookup( @@ -1101,18 +1102,12 @@ consumer_monitor(ConsumerTag, end, gb_sets:singleton(ConsumerTag), QCons), - monitor_queue(QPid, State#ch{queue_consumers = QCons1}); + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}; _ -> State end. -monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case sets:is_element(QPid, QMons) of - false -> erlang:monitor(process, QPid), - State#ch{queue_monitors = sets:add_element(QPid, QMons)}; - true -> State - end. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1322,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QNames}, State) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State1 = State#ch{queue_monitors = + pmon:monitor_all(DeliveredQPids, + State#ch.queue_monitors)}, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index d0b5bab7..2d155d14 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, - monitors = dict:new(), + monitors = pmon:new(), death_fun = DeathFun, length_fun = LengthFun }, hibernate, @@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> ok = LengthFun(), noreply(State); -handle_cast({ensure_monitoring, Pids}, - State = #state { monitors = Monitors }) -> - Monitors1 = - lists:foldl(fun (Pid, MonitorsN) -> - case dict:is_key(Pid, MonitorsN) of - true -> MonitorsN; - false -> MRef = erlang:monitor(process, Pid), - dict:store(Pid, MRef, MonitorsN) - end - end, Monitors, Pids), - noreply(State #state { monitors = Monitors1 }). +handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> gm:broadcast(GM, heartbeat), @@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> noreply(State); handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, - State = #state { monitors = Monitors, + State = #state { monitors = Mons, death_fun = DeathFun }) -> - noreply(case dict:is_key(Pid, Monitors) of + noreply(case pmon:is_monitored(Pid, Mons) of false -> State; true -> ok = DeathFun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } + State #state { monitors = pmon:erase(Pid, Mons) } end); handle_info(Msg, State) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 0a51bcaa..404cca2c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new(), + known_senders = pmon:new(), synchronised = false }, @@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], dict:new()), + Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - - MonitoringPids = [begin put({ch_publisher, Pid}, MRef), - Pid - end || {Pid, MRef} <- dict:to_list(KS)], - ok = rabbit_mirror_queue_coordinator:ensure_monitoring( - CPid, MonitoringPids), + MPids = pmon:monitored(KS), + ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MonitoringPids), + CPid, BQ, BQS, GM, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, MTC), + AckTags, Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> @@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - case dict:is_key(ChPid, KS) of - true -> State; - false -> MRef = erlang:monitor(process, ChPid), - State #state { known_senders = dict:store(ChPid, MRef, KS) } - end. + State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> - ok = case dict:is_key(ChPid, KS) of + ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> credit_flow:peer_down(ChPid), confirm_sender_death(ChPid) @@ -628,7 +620,7 @@ confirm_sender_death(Pid) -> fun (?MODULE, State = #state { known_senders = KS, gm = GM }) -> %% We're running still as a slave - ok = case dict:is_key(Pid, KS) of + ok = case pmon:is_monitored(KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -871,21 +863,18 @@ process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - {ok, case dict:find(ChPid, KS) of - error -> - State; - {ok, MRef} -> - true = erlang:demonitor(MRef), - MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = dict:erase(ChPid, KS) } + {ok, case pmon:is_monitored(ChPid, KS) of + false -> State; + true -> MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({length, Length}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index df957d88..6dad01cc 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues, delete_from}). +-record(state, {monitors, delete_from}). -include("rabbit.hrl"). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). -endif. @@ -51,39 +51,37 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new(), delete_from = undefined}}. + {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- -handle_call({register, Q}, _From, - State = #state{queues = Queues, delete_from = Deleting}) -> - MonitorRef = erlang:monitor(process, Q#amqqueue.pid), +handle_call({register, QPid}, _From, + State = #state{monitors = QMons, delete_from = Deleting}) -> case Deleting of undefined -> ok; - _ -> rabbit_amqqueue:delete_immediately(Q) + _ -> ok = rabbit_amqqueue:delete_immediately([QPid]) end, - {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}}; -handle_call(delete_all, From, State = #state{queues = Queues, +handle_call(delete_all, From, State = #state{monitors = QMons, delete_from = undefined}) -> - case dict:size(Queues) of - 0 -> {reply, ok, State#state{delete_from = From}}; - _ -> [rabbit_amqqueue:delete_immediately(Q) - || {_MRef, Q} <- dict:to_list(Queues)], - {noreply, State#state{delete_from = From}} + case pmon:monitored(QMons) of + [] -> {reply, ok, State#state{delete_from = From}}; + QPids -> ok = rabbit_amqqueue:delete_immediately(QPids), + {noreply, State#state{delete_from = From}} end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues, delete_from = Deleting}) -> - Queues1 = dict:erase(MonitorRef, Queues), - case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, + State = #state{monitors = QMons, delete_from = Deleting}) -> + QMons1 = pmon:erase(DownPid, QMons), + case Deleting =/= undefined andalso pmon:is_empty(QMons1) of true -> gen_server:reply(Deleting, ok); false -> ok end, - {noreply, State#state{queues = Queues1}}. + {noreply, State#state{monitors = QMons1}}. terminate(_Reason, _State) -> ok. |