summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/pmon.erl64
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl115
-rw-r--r--src/rabbit_channel.erl25
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl21
-rw-r--r--src/rabbit_mirror_queue_slave.erl53
-rw-r--r--src/rabbit_queue_collector.erl36
7 files changed, 174 insertions, 147 deletions
diff --git a/src/pmon.erl b/src/pmon.erl
new file mode 100644
index 00000000..45786577
--- /dev/null
+++ b/src/pmon.erl
@@ -0,0 +1,64 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(pmon).
+
+-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
+ monitored/1, is_empty/1]).
+
+-ifdef(use_specs).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: dict()).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()).
+-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitored/1 :: (?MODULE()) -> [pid()]).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+
+-endif.
+
+new() -> dict:new().
+
+monitor(Pid, M) ->
+ case dict:is_key(Pid, M) of
+ true -> M;
+ false -> dict:store(Pid, erlang:monitor(process, Pid), M)
+ end.
+
+monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids).
+
+demonitor(Pid, M) ->
+ case dict:find(Pid, M) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ dict:erase(Pid, M);
+ error -> M
+ end.
+
+is_monitored(Pid, M) -> dict:is_key(Pid, M).
+
+erase(Pid, M) -> dict:erase(Pid, M).
+
+monitored(M) -> dict:fetch_keys(M).
+
+is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 75091692..c1673504 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -109,7 +109,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_immediately/1 :: (qpids()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -468,8 +468,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-delete_immediately(#amqqueue{ pid = QPid }) ->
- gen_server2:cast(QPid, delete_immediately).
+delete_immediately(QPids) ->
+ [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
+ ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3bd13df1..3caf728b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/7]).
+-export([init_with_backing_queue_state/8]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ senders,
publish_seqno,
unconfirmed,
delayed_stop,
@@ -74,9 +75,9 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/7 ::
+-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [rabbit_types:delivery()], dict()) -> #q{}).
+ [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -131,18 +132,19 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, MTC) ->
+ RateTRef, AckTags, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = Senders,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -605,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- case get({ch_publisher, DownPid}) of
- undefined -> ok;
- MRef -> erlang:demonitor(MRef),
- erase({ch_publisher, DownPid}),
- credit_flow:peer_down(DownPid)
- end,
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+ senders = Senders}) ->
+ Senders1 = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end,
case lookup_ch(DownPid) of
not_found ->
- {ok, State};
+ {ok, State#q{senders = Senders1}};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
@@ -626,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers)},
+ ChPid, State#q.active_consumers),
+ senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -736,47 +740,32 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
rabbit_basic:delivery(
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
MsgSeqNo)),
- State1 = lists:foldl(fun monitor_queue/2, State, QPids),
- State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ State1 = State#q{queue_monitors = pmon:monitor_all(
+ QPids, State#q.queue_monitors),
+ publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> cleanup_after_confirm([AckTag], State2);
+ [] -> cleanup_after_confirm([AckTag], State1);
_ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State2#q{unconfirmed = UC1})
- end.
-
-monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:is_key(QPid, QMons) of
- true -> State;
- false -> State#q{queue_monitors =
- dict:store(QPid, erlang:monitor(process, QPid),
- QMons)}
- end.
-
-demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:find(QPid, QMons) of
- {ok, MRef} -> erlang:demonitor(MRef),
- State#q{queue_monitors = dict:erase(QPid, QMons)};
- error -> State
+ noreply(State1#q{unconfirmed = UC1})
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
- case dict:find(QPid, QMons) of
- error ->
- noreply(State);
- {ok, _} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- rabbit_log:warning(
- "DLQ ~p for ~s died with ~p unconfirmed messages~n",
- [QPid, rabbit_misc:rs(qname(State)), length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = dict:erase(QPid, QMons),
- unconfirmed = UC1})
+ case pmon:is_monitored(QPid, QMons) of
+ false -> noreply(State);
+ true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ QNameS = rabbit_misc:rs(qname(State)),
+ rabbit_log:warning("DLQ ~p for ~s died with "
+ "~p unconfirmed messages~n",
+ [QPid, QNameS, length(Lost)]);
+ false -> ok
+ end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = pmon:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
stop_later(Reason, State) ->
@@ -1194,7 +1183,8 @@ handle_call(force_event_refresh, _From,
handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
{MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
State1 = case dtree:is_defined(QPid, UC1) of
- false -> demonitor_queue(QPid, State);
+ false -> QMons = State#q.queue_monitors,
+ State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
true -> State
end,
cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
@@ -1208,22 +1198,19 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender,
msg_seq_no = MsgSeqNo}, Flow},
- State) ->
+ State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case Flow of
- flow -> Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow -> ok
- end,
- case already_been_here(Delivery, State) of
- false -> noreply(deliver_or_enqueue(Delivery, State));
+ Senders1 = case Flow of
+ flow -> credit_flow:ack(Sender),
+ pmon:monitor(Sender, Senders);
+ noflow -> Senders
+ end,
+ State1 = State#q{senders = Senders1},
+ case already_been_here(Delivery, State1) of
+ false -> noreply(deliver_or_enqueue(Delivery, State1));
Qs -> log_cycle_once(Qs),
rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State)
+ noreply(State1)
end;
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0e4f3693..846890a1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = sets:new(),
+ queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
- noreply(State3#ch{queue_monitors =
- sets:del_element(QPid, State3#ch.queue_monitors)});
+ noreply(State3#ch{queue_monitors = pmon:erase(
+ QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -935,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
- {new, Q = #amqqueue{}} ->
+ {new, #amqqueue{pid = QPid}} ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
@@ -943,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
ok = case Owner of
none -> ok;
_ -> rabbit_queue_collector:register(
- CollectorPid, Q)
+ CollectorPid, QPid)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -1089,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) ->
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
+ queue_monitors = QMons,
queue_consumers = QCons,
capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
@@ -1101,18 +1102,12 @@ consumer_monitor(ConsumerTag,
end,
gb_sets:singleton(ConsumerTag),
QCons),
- monitor_queue(QPid, State#ch{queue_consumers = QCons1});
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1};
_ ->
State
end.
-monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case sets:is_element(QPid, QMons) of
- false -> erlang:monitor(process, QPid),
- State#ch{queue_monitors = sets:add_element(QPid, QMons)};
- true -> State
- end.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_termination(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1322,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State1 = State#ch{queue_monitors =
+ pmon:monitor_all(DeliveredQPids,
+ State#ch.queue_monitors)},
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index d0b5bab7..2d155d14 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
- monitors = dict:new(),
+ monitors = pmon:new(),
death_fun = DeathFun,
length_fun = LengthFun },
hibernate,
@@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
ok = LengthFun(),
noreply(State);
-handle_cast({ensure_monitoring, Pids},
- State = #state { monitors = Monitors }) ->
- Monitors1 =
- lists:foldl(fun (Pid, MonitorsN) ->
- case dict:is_key(Pid, MonitorsN) of
- true -> MonitorsN;
- false -> MRef = erlang:monitor(process, Pid),
- dict:store(Pid, MRef, MonitorsN)
- end
- end, Monitors, Pids),
- noreply(State #state { monitors = Monitors1 }).
+handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
gm:broadcast(GM, heartbeat),
@@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
noreply(State);
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
- State = #state { monitors = Monitors,
+ State = #state { monitors = Mons,
death_fun = DeathFun }) ->
- noreply(case dict:is_key(Pid, Monitors) of
+ noreply(case pmon:is_monitored(Pid, Mons) of
false -> State;
true -> ok = DeathFun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
+ State #state { monitors = pmon:erase(Pid, Mons) }
end);
handle_info(Msg, State) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 0a51bcaa..404cca2c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) ->
ack_num = 0,
msg_id_status = dict:new(),
- known_senders = dict:new(),
+ known_senders = pmon:new(),
synchronised = false
},
@@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], dict:new()),
+ Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
-
- MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
- Pid
- end || {Pid, MRef} <- dict:to_list(KS)],
- ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
- CPid, MonitoringPids),
+ MPids = pmon:monitored(KS),
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MonitoringPids),
+ CPid, BQ, BQS, GM, SS, MPids),
MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, MTC),
+ AckTags, Deliveries, KS, MTC),
{become, rabbit_amqqueue_process, QueueState, hibernate}.
noreply(State) ->
@@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
State #state { rate_timer_ref = undefined }.
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
- case dict:is_key(ChPid, KS) of
- true -> State;
- false -> MRef = erlang:monitor(process, ChPid),
- State #state { known_senders = dict:store(ChPid, MRef, KS) }
- end.
+ State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
- ok = case dict:is_key(ChPid, KS) of
+ ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> credit_flow:peer_down(ChPid),
confirm_sender_death(ChPid)
@@ -628,7 +620,7 @@ confirm_sender_death(Pid) ->
fun (?MODULE, State = #state { known_senders = KS,
gm = GM }) ->
%% We're running still as a slave
- ok = case dict:is_key(Pid, KS) of
+ ok = case pmon:is_monitored(KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
confirm_sender_death(Pid)
@@ -871,21 +863,18 @@ process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- {ok, case dict:find(ChPid, KS) of
- error ->
- State;
- {ok, MRef} ->
- true = erlang:demonitor(MRef),
- MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = dict:erase(ChPid, KS) }
+ {ok, case pmon:is_monitored(ChPid, KS) of
+ false -> State;
+ true -> MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({length, Length},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index df957d88..6dad01cc 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues, delete_from}).
+-record(state, {monitors, delete_from}).
-include("rabbit.hrl").
@@ -32,7 +32,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -51,39 +51,37 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{queues = dict:new(), delete_from = undefined}}.
+ {ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
-handle_call({register, Q}, _From,
- State = #state{queues = Queues, delete_from = Deleting}) ->
- MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+handle_call({register, QPid}, _From,
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
case Deleting of
undefined -> ok;
- _ -> rabbit_amqqueue:delete_immediately(Q)
+ _ -> ok = rabbit_amqqueue:delete_immediately([QPid])
end,
- {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+ {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}};
-handle_call(delete_all, From, State = #state{queues = Queues,
+handle_call(delete_all, From, State = #state{monitors = QMons,
delete_from = undefined}) ->
- case dict:size(Queues) of
- 0 -> {reply, ok, State#state{delete_from = From}};
- _ -> [rabbit_amqqueue:delete_immediately(Q)
- || {_MRef, Q} <- dict:to_list(Queues)],
- {noreply, State#state{delete_from = From}}
+ case pmon:monitored(QMons) of
+ [] -> {reply, ok, State#state{delete_from = From}};
+ QPids -> ok = rabbit_amqqueue:delete_immediately(QPids),
+ {noreply, State#state{delete_from = From}}
end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
-handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues, delete_from = Deleting}) ->
- Queues1 = dict:erase(MonitorRef, Queues),
- case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of
+handle_info({'DOWN', _MRef, process, DownPid, _Reason},
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
+ QMons1 = pmon:erase(DownPid, QMons),
+ case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
true -> gen_server:reply(Deleting, ok);
false -> ok
end,
- {noreply, State#state{queues = Queues1}}.
+ {noreply, State#state{monitors = QMons1}}.
terminate(_Reason, _State) ->
ok.