diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-04-02 14:23:38 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-04-02 14:23:38 +0100 |
commit | d7caa83278adc50d18e36cb633524f2991d4831d (patch) | |
tree | 91d4b2da15131bd7e9d0ff5e22da619825307a01 | |
parent | 007a84fdb711e9fe488cf33df9c5d9622cb58383 (diff) | |
parent | 48a1033868039175b0d27fe9daf99501d6152159 (diff) | |
download | rabbitmq-server-d7caa83278adc50d18e36cb633524f2991d4831d.tar.gz |
Merge bug24750.
-rw-r--r-- | src/dtree.erl | 111 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 119 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 102 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 |
4 files changed, 173 insertions, 172 deletions
diff --git a/src/dtree.erl b/src/dtree.erl new file mode 100644 index 00000000..265bb340 --- /dev/null +++ b/src/dtree.erl @@ -0,0 +1,111 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +%% A dual-index tree. +%% +%% Conceptually, what we want is a map that has two distinct sets of +%% keys (referred to here as primary and secondary, although that +%% shouldn't imply a hierarchy) pointing to one set of +%% values. However, in practice what we'll always want to do is insert +%% a value that's pointed at by (one primary, many secondaries) and +%% remove values that are pointed at by (one secondary, many +%% primaries) or (one secondary, all primaries). Thus the API. +%% +%% Entries exists while they have a non-empty secondary key set. The +%% 'take' operations return the entries that got removed, i.e. that +%% had no remaining secondary keys. take/3 expects entries to exist +%% with the supplied primary keys and secondary key. take/2 can cope +%% with the supplied secondary key having no entries. + +-module(dtree). + +-export([empty/0, insert/4, take/3, take/2, + is_defined/2, is_empty/1, smallest/1, size/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: {gb_tree(), gb_tree()}). + +-type(pk() :: any()). +-type(sk() :: any()). +-type(val() :: any()). +-type(kv() :: {pk(), val()}). + +-spec(empty/0 :: () -> ?MODULE()). +-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). +-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). +-spec(smallest/1 :: (?MODULE()) -> kv()). +-spec(size/1 :: (?MODULE()) -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +empty() -> {gb_trees:empty(), gb_trees:empty()}. + +insert(PK, SKs, V, {P, S}) -> + {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), + lists:foldl(fun (SK, S0) -> + case gb_trees:lookup(SK, S0) of + {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS), + gb_trees:update(SK, PKS1, S0); + none -> PKS = gb_sets:singleton(PK), + gb_trees:insert(SK, PKS, S0) + end + end, S, SKs)}. + +take(PKs, SK, {P, S}) -> + {KVs, P1} = take2(PKs, SK, P), + PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)), + {KVs, {P1, case gb_sets:is_empty(PKS) of + true -> gb_trees:delete(SK, S); + false -> gb_trees:update(SK, PKS, S) + end}}. + +take(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P), + {KVs, {P1, gb_trees:delete(SK, S)}} + end. + +is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). + +is_empty({P, _S}) -> gb_trees:is_empty(P). + +smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P), + {K, V}. + +size({P, _S}) -> gb_trees:size(P). + +%%---------------------------------------------------------------------------- + +take2(PKs, SK, P) -> + lists:foldl(fun (PK, {KVs, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + SKS1 = gb_sets:delete(SK, SKS), + case gb_sets:is_empty(SKS1) of + true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)}; + false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} + end + end, {[], P}, PKs). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e1fd9bbc..7c1e4573 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -48,8 +48,7 @@ ttl, ttl_timer_ref, publish_seqno, - unconfirmed_mq, - unconfirmed_qm, + unconfirmed, delayed_stop, queue_monitors, dlx, @@ -135,8 +134,7 @@ init(Q) -> dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -161,8 +159,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -728,11 +725,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed_mq = UMQ, - dlx = DLX, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> {ok, _, QPids} = rabbit_basic:publish( rabbit_basic:delivery( @@ -741,20 +736,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> State3 = - lists:foldl( - fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> - UQM1 = rabbit_misc:gb_trees_set_insert( - QPid, MsgSeqNo, UQM), - State0#q{unconfirmed_qm = UQM1} - end, State2, QPids), - noreply(State3#q{ - unconfirmed_mq = - gb_trees:insert( - MsgSeqNo, {gb_sets:from_list(QPids), - AckTag}, UMQ)}) + [] -> cleanup_after_confirm([AckTag], State2); + _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), + noreply(State2#q{unconfirmed = UC1}) end. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> @@ -773,64 +757,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> + unconfirmed = UC}) -> case dict:find(QPid, QMons) of error -> noreply(State); {ok, _} -> rabbit_log:info("DLQ ~p (for ~s) died~n", [QPid, rabbit_misc:rs(qname(State))]), - State1 = State#q{queue_monitors = dict:erase(QPid, QMons)}, - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State1); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1) - end + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + case (MsgSeqNoAckTags =/= [] andalso + rabbit_misc:is_abnormal_termination(Reason)) of + true -> rabbit_log:warning("Dead queue lost ~p messages~n", + [length(MsgSeqNoAckTags)]); + false -> ok + end, + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = dict:erase(QPid, QMons), + unconfirmed = UC1}) end. -handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags1, UMQ3} = - lists:foldl( - fun (MsgSeqNo, {AckTags, UMQ1}) -> - {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), - QPids1 = gb_sets:delete(QPid, QPids), - case gb_sets:is_empty(QPids1) of - true -> {[AckTag | AckTags], - gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {AckTags, gb_trees:update( - MsgSeqNo, {QPids1, AckTag}, UMQ1)} - end - end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, BQS), - MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), - gb_sets:from_list(MsgSeqNos)), - State1 = case gb_sets:is_empty(MsgSeqNos1) of - false -> State#q{ - unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> demonitor_queue( - QPid, State#q{ - unconfirmed_qm = - gb_trees:delete(QPid, UQM)}) - end, - cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS1}). - stop_later(Reason, State) -> stop_later(Reason, undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> - case {gb_trees:is_empty(UMQ), Reply} of +stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> + case {dtree:is_empty(UC), Reply} of {true, noreply} -> {stop, Reason, State}; {true, _} -> @@ -839,16 +790,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) end. -cleanup_after_confirm(State = #q{delayed_stop = DS, - unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso DS =/= undefined of +cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, + unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of {_, {_, noreply}} -> ok; {_, {From, Reply}} -> gen_server2:reply(From, Reply) end, {Reason, _} = DS, - {stop, Reason, State}; - false -> noreply(State) + {stop, Reason, State1}; + false -> noreply(State1) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -1229,8 +1184,14 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State) -> - handle_confirm(MsgSeqNos, QPid, State); +handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> + {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + State1 = case dtree:is_defined(QPid, UC1) of + false -> demonitor_queue(QPid, State); + true -> State + end, + cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cac622f8..4a0e93be 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,8 @@ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + stats_timer, confirm_enabled, publish_seqno, unconfirmed, + confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, @@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), - record_confirms(MXs, State1). - -process_confirms(MsgSeqNos, QPid, Nack, State) -> - lists:foldl( - fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], State}, MsgSeqNos). - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, - {MXs, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}}, - Nack) -> - State1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> UQM1 = gb_trees:delete(QPid, UQM), - State#ch{unconfirmed_qm = UQM1}; - false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State#ch{unconfirmed_qm = UQM1} - end; - none -> - State - end, - Qs1 = gb_sets:del_element(QPid, Qs), - %% If QPid somehow died initiating a nack, clear the message from - %% internal data-structures. Also, cleanup empty entries. - case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), - {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; - false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), - {MXs, State1#ch{unconfirmed_mq = UMQ1}} - end. +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1152,22 +1115,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> true -> State end. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] - end, - %% We remove the MsgSeqNos from UQM before calling - %% process_confirms to prevent each MsgSeqNo being removed from - %% the set one by one which which would be inefficient - State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; - false -> {false, fun record_confirms/2} - end, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - SendFun(MXs, State2). +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(QPid, UC), + (case rabbit_misc:is_abnormal_termination(Reason) of + true -> fun send_nacks/2; + false -> fun record_confirms/2 + end)(MXs, State#ch{unconfirmed = UC1}). handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, @@ -1392,21 +1345,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ} = State, - UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), - SingletonSet = gb_sets:singleton(MsgSeqNo), - lists:foldl( - fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> - case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State0#ch{unconfirmed_qm = UQM1}; - none -> - UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - State0#ch{unconfirmed_qm = UQM1} - end - end, State#ch{unconfirmed_mq = UMQ1}, QPids). + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed)}. send_nacks([], State) -> State; @@ -1444,11 +1384,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UMQ) of + CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo + false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1462,8 +1402,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, maybe_complete_tx(State = #ch{tx_status = in_progress}) -> State; -maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) of +maybe_complete_tx(State = #ch{unconfirmed = UC}) -> + case dtree:is_empty(UC) of false -> State; true -> complete_tx(State#ch{confirmed = []}) end. @@ -1491,8 +1431,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> - gb_trees:size(UMQ); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + dtree:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 39409d2f..c1be7613 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,8 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, - gb_trees_set_insert/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -177,7 +176,6 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). --spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -719,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -gb_trees_set_insert(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> - Values1 = gb_sets:insert(Value, Values), - gb_trees:update(Key, Values1, Tree); - none -> - gb_trees:insert(Key, gb_sets:singleton(Value), Tree) - end. - gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). |