summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-23 13:17:44 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-23 13:17:44 +0100
commit0a9c5474c0e020fbab0bb204d3d687f51e4ecb87 (patch)
tree921b1262311e3fa90266d02a6d2b1f604ac756e0
parent3604e20a9a898b4b08d08706eba8b4b6b32bb66c (diff)
parent74f676d5312f6d8d270001b4e2ed464bdb2ff7cd (diff)
downloadrabbitmq-server-0a9c5474c0e020fbab0bb204d3d687f51e4ecb87.tar.gz
merge default
-rw-r--r--src/dtree.erl7
-rw-r--r--src/pmon.erl64
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl212
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_channel.erl29
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl58
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_msg_store.erl3
-rw-r--r--src/rabbit_networking.erl11
-rw-r--r--src/rabbit_queue_collector.erl36
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl17
15 files changed, 262 insertions, 223 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index 67bbbc1b..ca2d30cf 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -91,7 +91,7 @@ take(PKs, SK, {P, S}) ->
none -> {[], {P, S}};
{value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
PKSInter = gb_sets:intersection(PKS, TakenPKS),
- PKSDiff = gb_sets:difference (PKS, TakenPKS),
+ PKSDiff = gb_sets_difference (PKS, PKSInter),
{KVs, P1} = take2(PKSInter, SK, P),
{KVs, {P1, case gb_sets:is_empty(PKSDiff) of
true -> gb_trees:delete(SK, S);
@@ -152,9 +152,12 @@ take_all2(PKS, P) ->
prune(SKS, PKS, S) ->
gb_sets:fold(fun (SK0, S0) ->
PKS1 = gb_trees:get(SK0, S0),
- PKS2 = gb_sets:difference(PKS1, PKS),
+ PKS2 = gb_sets_difference(PKS1, PKS),
case gb_sets:is_empty(PKS2) of
true -> gb_trees:delete(SK0, S0);
false -> gb_trees:update(SK0, PKS2, S0)
end
end, S, SKS).
+
+gb_sets_difference(S1, S2) ->
+ gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/pmon.erl b/src/pmon.erl
new file mode 100644
index 00000000..45786577
--- /dev/null
+++ b/src/pmon.erl
@@ -0,0 +1,64 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(pmon).
+
+-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
+ monitored/1, is_empty/1]).
+
+-ifdef(use_specs).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: dict()).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()).
+-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitored/1 :: (?MODULE()) -> [pid()]).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+
+-endif.
+
+new() -> dict:new().
+
+monitor(Pid, M) ->
+ case dict:is_key(Pid, M) of
+ true -> M;
+ false -> dict:store(Pid, erlang:monitor(process, Pid), M)
+ end.
+
+monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids).
+
+demonitor(Pid, M) ->
+ case dict:find(Pid, M) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ dict:erase(Pid, M);
+ error -> M
+ end.
+
+is_monitored(Pid, M) -> dict:is_key(Pid, M).
+
+erase(Pid, M) -> dict:erase(Pid, M).
+
+monitored(M) -> dict:fetch_keys(M).
+
+is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 75091692..c1673504 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -109,7 +109,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_immediately/1 :: (qpids()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -468,8 +468,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-delete_immediately(#amqqueue{ pid = QPid }) ->
- gen_server2:cast(QPid, delete_immediately).
+delete_immediately(QPids) ->
+ [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
+ ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3bd13df1..6b825607 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/7]).
+-export([init_with_backing_queue_state/8]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ senders,
publish_seqno,
unconfirmed,
delayed_stop,
@@ -74,9 +75,9 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/7 ::
+-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [rabbit_types:delivery()], dict()) -> #q{}).
+ [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -131,18 +132,19 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, MTC) ->
+ RateTRef, AckTags, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = Senders,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -605,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- case get({ch_publisher, DownPid}) of
- undefined -> ok;
- MRef -> erlang:demonitor(MRef),
- erase({ch_publisher, DownPid}),
- credit_flow:peer_down(DownPid)
- end,
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+ senders = Senders}) ->
+ Senders1 = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end,
case lookup_ch(DownPid) of
not_found ->
- {ok, State};
+ {ok, State#q{senders = Senders1}};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
@@ -626,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers)},
+ ChPid, State#q.active_consumers),
+ senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -713,6 +717,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1};
+ack_if_no_dlx(_AckTags, State) ->
+ State.
+
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
@@ -720,63 +732,51 @@ dead_letter_fun(Reason, _State) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
end.
-dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
- case rabbit_exchange:lookup(DLX) of
- {error, not_found} -> noreply(State);
- _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State)
+dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = #basic_message{exchange_name = XName} =
+ make_dead_letter_msg(Reason, Msg, State),
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids;
+ {error, not_found} ->
+ []
end.
-dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
- {ok, _, QPids} =
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo)),
- State1 = lists:foldl(fun monitor_queue/2, State, QPids),
- State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC}) ->
+ QPids = dead_letter_publish(Msg, Reason, State),
+ State1 = State#q{queue_monitors = pmon:monitor_all(
+ QPids, State#q.queue_monitors),
+ publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> cleanup_after_confirm([AckTag], State2);
+ [] -> cleanup_after_confirm([AckTag], State1);
_ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State2#q{unconfirmed = UC1})
- end.
-
-monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:is_key(QPid, QMons) of
- true -> State;
- false -> State#q{queue_monitors =
- dict:store(QPid, erlang:monitor(process, QPid),
- QMons)}
- end.
-
-demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:find(QPid, QMons) of
- {ok, MRef} -> erlang:demonitor(MRef),
- State#q{queue_monitors = dict:erase(QPid, QMons)};
- error -> State
+ noreply(State1#q{unconfirmed = UC1})
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
- case dict:find(QPid, QMons) of
- error ->
- noreply(State);
- {ok, _} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- rabbit_log:warning(
- "DLQ ~p for ~s died with ~p unconfirmed messages~n",
- [QPid, rabbit_misc:rs(qname(State)), length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = dict:erase(QPid, QMons),
- unconfirmed = UC1})
+ case pmon:is_monitored(QPid, QMons) of
+ false -> noreply(State);
+ true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ QNameS = rabbit_misc:rs(qname(State)),
+ rabbit_log:warning("DLQ ~p for ~s died with "
+ "~p unconfirmed messages~n",
+ [QPid, QNameS, length(Lost)]);
+ false -> ok
+ end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = pmon:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
stop_later(Reason, State) ->
@@ -808,56 +808,58 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-already_been_here(_Delivery, #q{dlx = undefined}) ->
- false;
-already_been_here(#delivery{message = #basic_message{content = Content}},
- State) ->
+detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
- #resource{name = QueueName} = qname(State),
+ NoCycles = {Queues, []},
case Headers of
undefined ->
- false;
+ NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
{array, DeathTables} ->
OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
{table, D} <- DeathTables],
OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- case lists:member(QueueName, OldQueues1) of
- true -> [QueueName | OldQueues1];
- _ -> false
- end;
+ OldQueuesSet = ordsets:from_list(OldQueues1),
+ {Cycling, NotCycling} =
+ lists:partition(
+ fun(Queue) ->
+ ordsets:is_element(Queue#resource.name,
+ OldQueuesSet)
+ end, Queues),
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
_ ->
- false
+ NoCycles
end
end.
-make_dead_letter_msg(DLX, Reason,
+make_dead_letter_msg(Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
{DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
#resource{name = QName} = qname(State),
+ TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 =
- [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>,
- longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}],
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
Info, Headers))
end,
@@ -1194,7 +1196,8 @@ handle_call(force_event_refresh, _From,
handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
{MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
State1 = case dtree:is_defined(QPid, UC1) of
- false -> demonitor_queue(QPid, State);
+ false -> QMons = State#q.queue_monitors,
+ State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
true -> State
end,
cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
@@ -1206,25 +1209,16 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}, Flow},
- State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+ State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case Flow of
- flow -> Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow -> ok
- end,
- case already_been_here(Delivery, State) of
- false -> noreply(deliver_or_enqueue(Delivery, State));
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State)
- end;
+ Senders1 = case Flow of
+ flow -> credit_flow:ack(Sender),
+ pmon:monitor(Sender, Senders);
+ noflow -> Senders
+ end,
+ State1 = State#q{senders = Senders1},
+ noreply(deliver_or_enqueue(Delivery, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
@@ -1240,11 +1234,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
ChPid, AckTags, State,
case Requeue of
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> Fun = dead_letter_fun(rejected, State),
- fun (State1 = #q{backing_queue = BQ,
+ false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ Fun = dead_letter_fun(rejected, State1),
BQS1 = BQ:fold(Fun, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
+ ack_if_no_dlx(
+ AckTags,
+ State1#q{backing_queue_state = BQS1})
end
end));
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 7b00fa5f..286b69e4 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c1c11d8..846890a1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = sets:new(),
+ queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
- noreply(State3#ch{queue_monitors =
- sets:del_element(QPid, State3#ch.queue_monitors)});
+ noreply(State3#ch{queue_monitors = pmon:erase(
+ QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -758,9 +758,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
fun () -> {error, not_found} end,
fun () ->
rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag,
- ok_msg(NoWait, #'basic.cancel_ok'{
- consumer_tag = ConsumerTag}))
+ Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg))
end) of
ok ->
{noreply, NewState};
@@ -937,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
- {new, Q = #amqqueue{}} ->
+ {new, #amqqueue{pid = QPid}} ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
@@ -945,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
ok = case Owner of
none -> ok;
_ -> rabbit_queue_collector:register(
- CollectorPid, Q)
+ CollectorPid, QPid)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -1091,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) ->
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
+ queue_monitors = QMons,
queue_consumers = QCons,
capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
@@ -1103,18 +1102,12 @@ consumer_monitor(ConsumerTag,
end,
gb_sets:singleton(ConsumerTag),
QCons),
- monitor_queue(QPid, State#ch{queue_consumers = QCons1});
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1};
_ ->
State
end.
-monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case sets:is_element(QPid, QMons) of
- false -> erlang:monitor(process, QPid),
- State#ch{queue_monitors = sets:add_element(QPid, QMons)};
- true -> State
- end.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_termination(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1324,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State1 = State#ch{queue_monitors =
+ pmon:monitor_all(DeliveredQPids,
+ State#ch.queue_monitors)},
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index d0b5bab7..2d155d14 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
- monitors = dict:new(),
+ monitors = pmon:new(),
death_fun = DeathFun,
length_fun = LengthFun },
hibernate,
@@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
ok = LengthFun(),
noreply(State);
-handle_cast({ensure_monitoring, Pids},
- State = #state { monitors = Monitors }) ->
- Monitors1 =
- lists:foldl(fun (Pid, MonitorsN) ->
- case dict:is_key(Pid, MonitorsN) of
- true -> MonitorsN;
- false -> MRef = erlang:monitor(process, Pid),
- dict:store(Pid, MRef, MonitorsN)
- end
- end, Monitors, Pids),
- noreply(State #state { monitors = Monitors1 }).
+handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
gm:broadcast(GM, heartbeat),
@@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
noreply(State);
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
- State = #state { monitors = Monitors,
+ State = #state { monitors = Mons,
death_fun = DeathFun }) ->
- noreply(case dict:is_key(Pid, Monitors) of
+ noreply(case pmon:is_monitored(Pid, Mons) of
false -> State;
true -> ok = DeathFun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
+ State #state { monitors = pmon:erase(Pid, Mons) }
end);
handle_info(Msg, State) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 04b7514f..e6ef5c57 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -246,12 +246,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS}, AckTags) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- ok = gm:broadcast(GM, {fold, MsgFun, AckTags}),
- State #state { backing_queue_state = BQS1 }.
+fold(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 0a51bcaa..a7a1273d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) ->
ack_num = 0,
msg_id_status = dict:new(),
- known_senders = dict:new(),
+ known_senders = pmon:new(),
synchronised = false
},
@@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], dict:new()),
+ Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
-
- MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
- Pid
- end || {Pid, MRef} <- dict:to_list(KS)],
- ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
- CPid, MonitoringPids),
+ MPids = pmon:monitored(KS),
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MonitoringPids),
+ CPid, BQ, BQS, GM, SS, MPids),
MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, MTC),
+ AckTags, Deliveries, KS, MTC),
{become, rabbit_amqqueue_process, QueueState, hibernate}.
noreply(State) ->
@@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
State #state { rate_timer_ref = undefined }.
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
- case dict:is_key(ChPid, KS) of
- true -> State;
- false -> MRef = erlang:monitor(process, ChPid),
- State #state { known_senders = dict:store(ChPid, MRef, KS) }
- end.
+ State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
- ok = case dict:is_key(ChPid, KS) of
+ ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> credit_flow:peer_down(ChPid),
confirm_sender_death(ChPid)
@@ -628,7 +620,7 @@ confirm_sender_death(Pid) ->
fun (?MODULE, State = #state { known_senders = KS,
gm = GM }) ->
%% We're running still as a slave
- ok = case dict:is_key(Pid, KS) of
+ ok = case pmon:is_monitored(Pid, KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
confirm_sender_death(Pid)
@@ -843,11 +835,6 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({fold, MsgFun, AckTags},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- BQS1 = BQ:fold(MsgFun, BQS, AckTags),
- {ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -871,21 +858,18 @@ process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- {ok, case dict:find(ChPid, KS) of
- error ->
- State;
- {ok, MRef} ->
- true = erlang:demonitor(MRef),
- MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = dict:erase(ChPid, KS) }
+ {ok, case pmon:is_monitored(ChPid, KS) of
+ false -> State;
+ true -> MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({length, Length},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c1be7613..0aacd654 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,7 @@
-export([multi_call/2]).
-export([quit/1]).
-export([os_cmd/1]).
+-export([gb_sets_difference/2]).
%%----------------------------------------------------------------------------
@@ -204,6 +205,7 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
-spec(os_cmd/1 :: (string()) -> string()).
+-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-endif.
@@ -912,3 +914,6 @@ os_cmd(Command) ->
false -> throw({command_not_found, Exec});
_ -> os:cmd(Command)
end.
+
+gb_sets_difference(S1, S2) ->
+ gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 56265136..d69dad1f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1240,7 +1240,8 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
case dict:find(CRef, CTM) of
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
ActionTaken),
- MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ MsgIds1 = rabbit_misc:gb_sets_difference(
+ Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
true -> dict:erase(CRef, CTM);
false -> dict:store(CRef, MsgIds1, CTM)
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 825d1bb1..f0c75d23 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -446,16 +446,19 @@ ipv6_status(TestPort) ->
{ok, LSock4} -> gen_tcp:close(LSock4),
single_stack;
%% IPv6-only machine. Welcome to the future.
- {error, eafnosupport} -> ipv6_only;
+ {error, eafnosupport} -> ipv6_only; %% Linux
+ {error, eprotonosupport}-> ipv6_only; %% FreeBSD
%% Dual stack machine with something already
%% on IPv4.
{error, _} -> ipv6_status(TestPort + 1)
end
end;
- {error, eafnosupport} ->
- %% IPv4-only machine. Welcome to the 90s.
+ %% IPv4-only machine. Welcome to the 90s.
+ {error, eafnosupport} -> %% Linux
ipv4_only;
+ {error, eprotonosupport} -> %% FreeBSD
+ ipv4_only;
+ %% Port in use
{error, _} ->
- %% Port in use
ipv6_status(TestPort + 1)
end.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index df957d88..6dad01cc 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues, delete_from}).
+-record(state, {monitors, delete_from}).
-include("rabbit.hrl").
@@ -32,7 +32,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -51,39 +51,37 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{queues = dict:new(), delete_from = undefined}}.
+ {ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
-handle_call({register, Q}, _From,
- State = #state{queues = Queues, delete_from = Deleting}) ->
- MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+handle_call({register, QPid}, _From,
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
case Deleting of
undefined -> ok;
- _ -> rabbit_amqqueue:delete_immediately(Q)
+ _ -> ok = rabbit_amqqueue:delete_immediately([QPid])
end,
- {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+ {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}};
-handle_call(delete_all, From, State = #state{queues = Queues,
+handle_call(delete_all, From, State = #state{monitors = QMons,
delete_from = undefined}) ->
- case dict:size(Queues) of
- 0 -> {reply, ok, State#state{delete_from = From}};
- _ -> [rabbit_amqqueue:delete_immediately(Q)
- || {_MRef, Q} <- dict:to_list(Queues)],
- {noreply, State#state{delete_from = From}}
+ case pmon:monitored(QMons) of
+ [] -> {reply, ok, State#state{delete_from = From}};
+ QPids -> ok = rabbit_amqqueue:delete_immediately(QPids),
+ {noreply, State#state{delete_from = From}}
end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
-handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues, delete_from = Deleting}) ->
- Queues1 = dict:erase(MonitorRef, Queues),
- case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of
+handle_info({'DOWN', _MRef, process, DownPid, _Reason},
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
+ QMons1 = pmon:erase(DownPid, QMons),
+ case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
true -> gen_server:reply(Deleting, ok);
false -> ok
end,
- {noreply, State#state{queues = Queues1}}.
+ {noreply, State#state{monitors = QMons1}}.
terminate(_Reason, _State) ->
ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e356d7ff..c74b8d5f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2306,8 +2306,8 @@ wait_for_confirms(Unconfirmed) ->
true -> ok;
false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
wait_for_confirms(
- gb_sets:difference(Unconfirmed,
- gb_sets:from_list(Confirmed)))
+ rabbit_misc:gb_sets_difference(
+ Unconfirmed, gb_sets:from_list(Confirmed)))
after 5000 -> exit(timeout_waiting_for_confirm)
end
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 09580261..c3462929 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -589,10 +589,10 @@ dropwhile(Pred, MsgFun, State) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, MsgFun, State2);
{true, _} ->
- {{_, _, AckTag, _}, State2} =
- internal_fetch(true, MsgStatus, State1),
- {MsgStatus, State3} = read_msg(MsgStatus, State2),
- MsgFun(MsgStatus#msg_status.msg, AckTag),
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {{Msg, _, AckTag, _}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
+ ok = MsgFun(Msg, AckTag),
dropwhile(Pred, MsgFun, State3);
{false, _} ->
a(in_r(MsgStatus, State1))
@@ -1289,10 +1289,11 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC,
confirmed = C }) ->
- State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
- msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
- unconfirmed = gb_sets:difference(UC, MsgIdSet),
- confirmed = gb_sets:union (C, MsgIdSet) }.
+ State #vqstate {
+ msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
+ msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
+ unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
+ confirmed = gb_sets:union(C, MsgIdSet) }.
must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->