diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-23 13:17:44 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-23 13:17:44 +0100 |
commit | 0a9c5474c0e020fbab0bb204d3d687f51e4ecb87 (patch) | |
tree | 921b1262311e3fa90266d02a6d2b1f604ac756e0 | |
parent | 3604e20a9a898b4b08d08706eba8b4b6b32bb66c (diff) | |
parent | 74f676d5312f6d8d270001b4e2ed464bdb2ff7cd (diff) | |
download | rabbitmq-server-0a9c5474c0e020fbab0bb204d3d687f51e4ecb87.tar.gz |
merge default
-rw-r--r-- | src/dtree.erl | 7 | ||||
-rw-r--r-- | src/pmon.erl | 64 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 212 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 29 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 21 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 58 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 5 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 3 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 11 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 36 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 17 |
15 files changed, 262 insertions, 223 deletions
diff --git a/src/dtree.erl b/src/dtree.erl index 67bbbc1b..ca2d30cf 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -91,7 +91,7 @@ take(PKs, SK, {P, S}) -> none -> {[], {P, S}}; {value, PKS} -> TakenPKS = gb_sets:from_list(PKs), PKSInter = gb_sets:intersection(PKS, TakenPKS), - PKSDiff = gb_sets:difference (PKS, TakenPKS), + PKSDiff = gb_sets_difference (PKS, PKSInter), {KVs, P1} = take2(PKSInter, SK, P), {KVs, {P1, case gb_sets:is_empty(PKSDiff) of true -> gb_trees:delete(SK, S); @@ -152,9 +152,12 @@ take_all2(PKS, P) -> prune(SKS, PKS, S) -> gb_sets:fold(fun (SK0, S0) -> PKS1 = gb_trees:get(SK0, S0), - PKS2 = gb_sets:difference(PKS1, PKS), + PKS2 = gb_sets_difference(PKS1, PKS), case gb_sets:is_empty(PKS2) of true -> gb_trees:delete(SK0, S0); false -> gb_trees:update(SK0, PKS2, S0) end end, S, SKS). + +gb_sets_difference(S1, S2) -> + gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). diff --git a/src/pmon.erl b/src/pmon.erl new file mode 100644 index 00000000..45786577 --- /dev/null +++ b/src/pmon.erl @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% + +-module(pmon). + +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, + monitored/1, is_empty/1]). + +-ifdef(use_specs). + +%%---------------------------------------------------------------------------- + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: dict()). + +-spec(new/0 :: () -> ?MODULE()). +-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()). +-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitored/1 :: (?MODULE()) -> [pid()]). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). + +-endif. + +new() -> dict:new(). + +monitor(Pid, M) -> + case dict:is_key(Pid, M) of + true -> M; + false -> dict:store(Pid, erlang:monitor(process, Pid), M) + end. + +monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids). + +demonitor(Pid, M) -> + case dict:find(Pid, M) of + {ok, MRef} -> erlang:demonitor(MRef), + dict:erase(Pid, M); + error -> M + end. + +is_monitored(Pid, M) -> dict:is_key(Pid, M). + +erase(Pid, M) -> dict:erase(Pid, M). + +monitored(M) -> dict:fetch_keys(M). + +is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 75091692..c1673504 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_immediately/1 :: (qpids()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -468,8 +468,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -delete_immediately(#amqqueue{ pid = QPid }) -> - gen_server2:cast(QPid, delete_immediately). +delete_immediately(QPids) -> + [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], + ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3bd13df1..6b825607 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/8]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + senders, publish_seqno, unconfirmed, delayed_stop, @@ -74,9 +75,9 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/7 :: +-spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], dict()) -> #q{}). + [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -131,18 +132,19 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, MTC) -> + RateTRef, AckTags, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -605,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - case get({ch_publisher, DownPid}) of - undefined -> ok; - MRef -> erlang:demonitor(MRef), - erase({ch_publisher, DownPid}), - credit_flow:peer_down(DownPid) - end, +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + senders = Senders}) -> + Senders1 = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end, case lookup_ch(DownPid) of not_found -> - {ok, State}; + {ok, State#q{senders = Senders1}}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> @@ -626,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers)}, + ChPid, State#q.active_consumers), + senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -713,6 +717,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_if_no_dlx(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -720,63 +732,51 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] end. -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> - {ok, _, QPids} = - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo)), - State1 = lists:foldl(fun monitor_queue/2, State, QPids), - State2 = State1#q{publish_seqno = MsgSeqNo + 1}, +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> + QPids = dead_letter_publish(Msg, Reason, State), + State1 = State#q{queue_monitors = pmon:monitor_all( + QPids, State#q.queue_monitors), + publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> cleanup_after_confirm([AckTag], State2); + [] -> cleanup_after_confirm([AckTag], State1); _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), - noreply(State2#q{unconfirmed = UC1}) - end. - -monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:is_key(QPid, QMons) of - true -> State; - false -> State#q{queue_monitors = - dict:store(QPid, erlang:monitor(process, QPid), - QMons)} - end. - -demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:find(QPid, QMons) of - {ok, MRef} -> erlang:demonitor(MRef), - State#q{queue_monitors = dict:erase(QPid, QMons)}; - error -> State + noreply(State1#q{unconfirmed = UC1}) end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> - case dict:find(QPid, QMons) of - error -> - noreply(State); - {ok, _} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - rabbit_log:warning( - "DLQ ~p for ~s died with ~p unconfirmed messages~n", - [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = dict:erase(QPid, QMons), - unconfirmed = UC1}) + case pmon:is_monitored(QPid, QMons) of + false -> noreply(State); + true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + QNameS = rabbit_misc:rs(qname(State)), + rabbit_log:warning("DLQ ~p for ~s died with " + "~p unconfirmed messages~n", + [QPid, QNameS, length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = pmon:erase(QPid, QMons), + unconfirmed = UC1}) end. stop_later(Reason, State) -> @@ -808,56 +808,58 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -already_been_here(_Delivery, #q{dlx = undefined}) -> - false; -already_been_here(#delivery{message = #basic_message{content = Content}}, - State) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), - #resource{name = QueueName} = qname(State), + NoCycles = {Queues, []}, case Headers of undefined -> - false; + NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of {array, DeathTables} -> OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || {table, D} <- DeathTables], OldQueues1 = [QName || {longstr, QName} <- OldQueues], - case lists:member(QueueName, OldQueues1) of - true -> [QueueName | OldQueues1]; - _ -> false - end; + OldQueuesSet = ordsets:from_list(OldQueues1), + {Cycling, NotCycling} = + lists:partition( + fun(Queue) -> + ordsets:is_element(Queue#resource.name, + OldQueuesSet) + end, Queues), + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; _ -> - false + NoCycles end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, @@ -1194,7 +1196,8 @@ handle_call(force_event_refresh, _From, handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), State1 = case dtree:is_defined(QPid, UC1) of - false -> demonitor_queue(QPid, State); + false -> QMons = State#q.queue_monitors, + State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; true -> State end, cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], @@ -1206,25 +1209,16 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, Flow}, - State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, + State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - case already_been_here(Delivery, State) of - false -> noreply(deliver_or_enqueue(Delivery, State)); - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) - end; + Senders1 = case Flow of + flow -> credit_flow:ack(Sender), + pmon:monitor(Sender, Senders); + noflow -> Senders + end, + State1 = State#q{senders = Senders1}, + noreply(deliver_or_enqueue(Delivery, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( @@ -1240,11 +1234,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> ChPid, AckTags, State, case Requeue of true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, + false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) end end)); diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 7b00fa5f..286b69e4 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c1c11d8..846890a1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = sets:new(), + queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), credit_flow:peer_down(QPid), erase_queue_stats(QPid), - noreply(State3#ch{queue_monitors = - sets:del_element(QPid, State3#ch.queue_monitors)}); + noreply(State3#ch{queue_monitors = pmon:erase( + QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -758,9 +758,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, fun () -> {error, not_found} end, fun () -> rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, - ok_msg(NoWait, #'basic.cancel_ok'{ - consumer_tag = ConsumerTag})) + Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg)) end) of ok -> {noreply, NewState}; @@ -937,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of - {new, Q = #amqqueue{}} -> + {new, #amqqueue{pid = QPid}} -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as @@ -945,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, ok = case Owner of none -> ok; _ -> rabbit_queue_collector:register( - CollectorPid, Q) + CollectorPid, QPid) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> @@ -1091,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) -> consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + queue_monitors = QMons, queue_consumers = QCons, capabilities = Capabilities}) -> case rabbit_misc:table_lookup( @@ -1103,18 +1102,12 @@ consumer_monitor(ConsumerTag, end, gb_sets:singleton(ConsumerTag), QCons), - monitor_queue(QPid, State#ch{queue_consumers = QCons1}); + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}; _ -> State end. -monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case sets:is_element(QPid, QMons) of - false -> erlang:monitor(process, QPid), - State#ch{queue_monitors = sets:add_element(QPid, QMons)}; - true -> State - end. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1324,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QNames}, State) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State1 = State#ch{queue_monitors = + pmon:monitor_all(DeliveredQPids, + State#ch.queue_monitors)}, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index d0b5bab7..2d155d14 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, - monitors = dict:new(), + monitors = pmon:new(), death_fun = DeathFun, length_fun = LengthFun }, hibernate, @@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> ok = LengthFun(), noreply(State); -handle_cast({ensure_monitoring, Pids}, - State = #state { monitors = Monitors }) -> - Monitors1 = - lists:foldl(fun (Pid, MonitorsN) -> - case dict:is_key(Pid, MonitorsN) of - true -> MonitorsN; - false -> MRef = erlang:monitor(process, Pid), - dict:store(Pid, MRef, MonitorsN) - end - end, Monitors, Pids), - noreply(State #state { monitors = Monitors1 }). +handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> gm:broadcast(GM, heartbeat), @@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> noreply(State); handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, - State = #state { monitors = Monitors, + State = #state { monitors = Mons, death_fun = DeathFun }) -> - noreply(case dict:is_key(Pid, Monitors) of + noreply(case pmon:is_monitored(Pid, Mons) of false -> State; true -> ok = DeathFun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } + State #state { monitors = pmon:erase(Pid, Mons) } end); handle_info(Msg, State) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 04b7514f..e6ef5c57 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -246,12 +246,9 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -fold(MsgFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS}, AckTags) -> - BQS1 = BQ:fold(MsgFun, BQS, AckTags), - ok = gm:broadcast(GM, {fold, MsgFun, AckTags}), - State #state { backing_queue_state = BQS1 }. +fold(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 0a51bcaa..a7a1273d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new(), + known_senders = pmon:new(), synchronised = false }, @@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], dict:new()), + Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - - MonitoringPids = [begin put({ch_publisher, Pid}, MRef), - Pid - end || {Pid, MRef} <- dict:to_list(KS)], - ok = rabbit_mirror_queue_coordinator:ensure_monitoring( - CPid, MonitoringPids), + MPids = pmon:monitored(KS), + ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MonitoringPids), + CPid, BQ, BQS, GM, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, MTC), + AckTags, Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> @@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - case dict:is_key(ChPid, KS) of - true -> State; - false -> MRef = erlang:monitor(process, ChPid), - State #state { known_senders = dict:store(ChPid, MRef, KS) } - end. + State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> - ok = case dict:is_key(ChPid, KS) of + ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> credit_flow:peer_down(ChPid), confirm_sender_death(ChPid) @@ -628,7 +620,7 @@ confirm_sender_death(Pid) -> fun (?MODULE, State = #state { known_senders = KS, gm = GM }) -> %% We're running still as a slave - ok = case dict:is_key(Pid, KS) of + ok = case pmon:is_monitored(Pid, KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -843,11 +835,6 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({fold, MsgFun, AckTags}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - BQS1 = BQ:fold(MsgFun, BQS, AckTags), - {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -871,21 +858,18 @@ process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - {ok, case dict:find(ChPid, KS) of - error -> - State; - {ok, MRef} -> - true = erlang:demonitor(MRef), - MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = dict:erase(ChPid, KS) } + {ok, case pmon:is_monitored(ChPid, KS) of + false -> State; + true -> MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({length, Length}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c1be7613..0aacd654 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([multi_call/2]). -export([quit/1]). -export([os_cmd/1]). +-export([gb_sets_difference/2]). %%---------------------------------------------------------------------------- @@ -204,6 +205,7 @@ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). -spec(os_cmd/1 :: (string()) -> string()). +-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -endif. @@ -912,3 +914,6 @@ os_cmd(Command) -> false -> throw({command_not_found, Exec}); _ -> os:cmd(Command) end. + +gb_sets_difference(S1, S2) -> + gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 56265136..d69dad1f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1240,7 +1240,8 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> case dict:find(CRef, CTM) of {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), ActionTaken), - MsgIds1 = gb_sets:difference(Gs, MsgIds), + MsgIds1 = rabbit_misc:gb_sets_difference( + Gs, MsgIds), case gb_sets:is_empty(MsgIds1) of true -> dict:erase(CRef, CTM); false -> dict:store(CRef, MsgIds1, CTM) diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 825d1bb1..f0c75d23 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -446,16 +446,19 @@ ipv6_status(TestPort) -> {ok, LSock4} -> gen_tcp:close(LSock4), single_stack; %% IPv6-only machine. Welcome to the future. - {error, eafnosupport} -> ipv6_only; + {error, eafnosupport} -> ipv6_only; %% Linux + {error, eprotonosupport}-> ipv6_only; %% FreeBSD %% Dual stack machine with something already %% on IPv4. {error, _} -> ipv6_status(TestPort + 1) end end; - {error, eafnosupport} -> - %% IPv4-only machine. Welcome to the 90s. + %% IPv4-only machine. Welcome to the 90s. + {error, eafnosupport} -> %% Linux ipv4_only; + {error, eprotonosupport} -> %% FreeBSD + ipv4_only; + %% Port in use {error, _} -> - %% Port in use ipv6_status(TestPort + 1) end. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index df957d88..6dad01cc 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues, delete_from}). +-record(state, {monitors, delete_from}). -include("rabbit.hrl"). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). -endif. @@ -51,39 +51,37 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new(), delete_from = undefined}}. + {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- -handle_call({register, Q}, _From, - State = #state{queues = Queues, delete_from = Deleting}) -> - MonitorRef = erlang:monitor(process, Q#amqqueue.pid), +handle_call({register, QPid}, _From, + State = #state{monitors = QMons, delete_from = Deleting}) -> case Deleting of undefined -> ok; - _ -> rabbit_amqqueue:delete_immediately(Q) + _ -> ok = rabbit_amqqueue:delete_immediately([QPid]) end, - {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}}; -handle_call(delete_all, From, State = #state{queues = Queues, +handle_call(delete_all, From, State = #state{monitors = QMons, delete_from = undefined}) -> - case dict:size(Queues) of - 0 -> {reply, ok, State#state{delete_from = From}}; - _ -> [rabbit_amqqueue:delete_immediately(Q) - || {_MRef, Q} <- dict:to_list(Queues)], - {noreply, State#state{delete_from = From}} + case pmon:monitored(QMons) of + [] -> {reply, ok, State#state{delete_from = From}}; + QPids -> ok = rabbit_amqqueue:delete_immediately(QPids), + {noreply, State#state{delete_from = From}} end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues, delete_from = Deleting}) -> - Queues1 = dict:erase(MonitorRef, Queues), - case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, + State = #state{monitors = QMons, delete_from = Deleting}) -> + QMons1 = pmon:erase(DownPid, QMons), + case Deleting =/= undefined andalso pmon:is_empty(QMons1) of true -> gen_server:reply(Deleting, ok); false -> ok end, - {noreply, State#state{queues = Queues1}}. + {noreply, State#state{monitors = QMons1}}. terminate(_Reason, _State) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e356d7ff..c74b8d5f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2306,8 +2306,8 @@ wait_for_confirms(Unconfirmed) -> true -> ok; false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> wait_for_confirms( - gb_sets:difference(Unconfirmed, - gb_sets:from_list(Confirmed))) + rabbit_misc:gb_sets_difference( + Unconfirmed, gb_sets:from_list(Confirmed))) after 5000 -> exit(timeout_waiting_for_confirm) end end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 09580261..c3462929 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -589,10 +589,10 @@ dropwhile(Pred, MsgFun, State) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, MsgFun, State2); {true, _} -> - {{_, _, AckTag, _}, State2} = - internal_fetch(true, MsgStatus, State1), - {MsgStatus, State3} = read_msg(MsgStatus, State2), - MsgFun(MsgStatus#msg_status.msg, AckTag), + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, _, AckTag, _}, State3} = + internal_fetch(true, MsgStatus1, State2), + ok = MsgFun(Msg, AckTag), dropwhile(Pred, MsgFun, State3); {false, _} -> a(in_r(MsgStatus, State1)) @@ -1289,10 +1289,11 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC, confirmed = C }) -> - State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet), - msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet), - unconfirmed = gb_sets:difference(UC, MsgIdSet), - confirmed = gb_sets:union (C, MsgIdSet) }. + State #vqstate { + msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), + msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), + unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), + confirmed = gb_sets:union(C, MsgIdSet) }. must_sync_index(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> |