diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-12 11:08:48 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-12 11:08:48 +0100 |
commit | fe05a3c993839b2f2eed8fa9ba54ec61530bedbb (patch) | |
tree | fc0b02f1b93c69788a21abac572520514f68dbdb | |
parent | 0a9e0d95837b1b895a8b39edfd990f99b6999837 (diff) | |
parent | 797c633e2a56ffed0ae338979471a97bc7f26bbb (diff) | |
download | rabbitmq-server-fe05a3c993839b2f2eed8fa9ba54ec61530bedbb.tar.gz |
Merged default
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/postrm.in | 9 | ||||
-rw-r--r-- | src/dtree.erl | 160 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 242 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 13 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 103 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 |
12 files changed, 334 insertions, 253 deletions
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index fb02cd6a..e935acf5 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,6 +2,8 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> +Uploader: Emile Joubert <emile@rabbitmq.com> +DM-Upload-Allowed: yes Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip Standards-Version: 3.8.0 diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index baf081fc..c2e9bbfe 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -35,20 +35,15 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi - remove_plugin_traces + remove_plugin_traces if getent passwd rabbitmq >/dev/null; then # Stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : - - deluser rabbitmq - fi - if getent group rabbitmq >/dev/null; then - delgroup rabbitmq fi ;; remove|upgrade) - remove_plugin_traces + remove_plugin_traces ;; failed-upgrade|abort-install|abort-upgrade|disappear) diff --git a/src/dtree.erl b/src/dtree.erl new file mode 100644 index 00000000..67bbbc1b --- /dev/null +++ b/src/dtree.erl @@ -0,0 +1,160 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +%% A dual-index tree. +%% +%% Entries have the following shape: +%% +%% +----+--------------------+---+ +%% | PK | SK1, SK2, ..., SKN | V | +%% +----+--------------------+---+ +%% +%% i.e. a primary key, set of secondary keys, and a value. +%% +%% There can be only one entry per primary key, but secondary keys may +%% appear in multiple entries. +%% +%% The set of secondary keys must be non-empty. Or, to put it another +%% way, entries only exist while their secondary key set is non-empty. + +-module(dtree). + +-export([empty/0, insert/4, take/3, take/2, take_all/2, + is_defined/2, is_empty/1, smallest/1, size/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: {gb_tree(), gb_tree()}). + +-type(pk() :: any()). +-type(sk() :: any()). +-type(val() :: any()). +-type(kv() :: {pk(), val()}). + +-spec(empty/0 :: () -> ?MODULE()). +-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). +-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). +-spec(smallest/1 :: (?MODULE()) -> kv()). +-spec(size/1 :: (?MODULE()) -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +empty() -> {gb_trees:empty(), gb_trees:empty()}. + +%% Insert an entry. Fails if there already is an entry with the given +%% primary key. +insert(PK, [], V, {P, S}) -> + %% dummy insert to force error if PK exists + gb_trees:insert(PK, {gb_sets:empty(), V}, P), + {P, S}; +insert(PK, SKs, V, {P, S}) -> + {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), + lists:foldl(fun (SK, S0) -> + case gb_trees:lookup(SK, S0) of + {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS), + gb_trees:update(SK, PKS1, S0); + none -> PKS = gb_sets:singleton(PK), + gb_trees:insert(SK, PKS, S0) + end + end, S, SKs)}. + +%% Remove the given secondary key from the entries of the given +%% primary keys, returning the primary-key/value pairs of any entries +%% that were dropped as the result (i.e. due to their secondary key +%% set becoming empty). It is ok for the given primary keys and/or +%% secondary key to not exist. +take(PKs, SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> TakenPKS = gb_sets:from_list(PKs), + PKSInter = gb_sets:intersection(PKS, TakenPKS), + PKSDiff = gb_sets:difference (PKS, TakenPKS), + {KVs, P1} = take2(PKSInter, SK, P), + {KVs, {P1, case gb_sets:is_empty(PKSDiff) of + true -> gb_trees:delete(SK, S); + false -> gb_trees:update(SK, PKSDiff, S) + end}} + end. + +%% Remove the given secondary key from all entries, returning the +%% primary-key/value pairs of any entries that were dropped as the +%% result (i.e. due to their secondary key set becoming empty). It is +%% ok for the given secondary key to not exist. +take(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, P1} = take2(PKS, SK, P), + {KVs, {P1, gb_trees:delete(SK, S)}} + end. + +%% Drop all entries which contain the given secondary key, returning +%% the primary-key/value pairs of these entries. It is ok for the +%% given secondary key to not exist. +take_all(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P), + {KVs, {P1, prune(SKS, PKS, S)}} + end. + +is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). + +is_empty({P, _S}) -> gb_trees:is_empty(P). + +smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P), + {K, V}. + +size({P, _S}) -> gb_trees:size(P). + +%%---------------------------------------------------------------------------- + +take2(PKS, SK, P) -> + gb_sets:fold(fun (PK, {KVs, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + SKS1 = gb_sets:delete(SK, SKS), + case gb_sets:is_empty(SKS1) of + true -> KVs1 = [{PK, V} | KVs], + {KVs1, gb_trees:delete(PK, P0)}; + false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} + end + end, {[], P}, PKS). + +take_all2(PKS, P) -> + gb_sets:fold(fun (PK, {KVs, SKS0, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + {[{PK, V} | KVs], gb_sets:union(SKS, SKS0), + gb_trees:delete(PK, P0)} + end, {[], gb_sets:empty(), P}, PKS). + +prune(SKS, PKS, S) -> + gb_sets:fold(fun (SK0, S0) -> + PKS1 = gb_trees:get(SK0, S0), + PKS2 = gb_sets:difference(PKS1, PKS), + case gb_sets:is_empty(PKS2) of + true -> gb_trees:delete(SK0, S0); + false -> gb_trees:update(SK0, PKS2, S0) + end + end, S, SKS). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ecbcbc3..75091692 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, - {<<"x-message-ttl">>, fun check_positive_int_arg/2}, + {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], @@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) -> check_string_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. -check_positive_int_arg({Type, Val}, _Args) -> +check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of - false -> {error, {unacceptable_type, Type}}; - true when Val =< 0 -> {error, {value_zero_or_less, Val}}; - true -> ok + true -> ok; + false -> {error, {unacceptable_type, Type}} + end. + +check_positive_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val > 0 -> ok; + ok -> {error, {value_zero_or_less, Val}}; + Error -> Error + end. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_less_than_zero, Val}}; + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e1fd9bbc..3bd13df1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -48,8 +48,7 @@ ttl, ttl_timer_ref, publish_seqno, - unconfirmed_mq, - unconfirmed_qm, + unconfirmed, delayed_stop, queue_monitors, dlx, @@ -135,8 +134,7 @@ init(Q) -> dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -161,8 +159,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -490,8 +487,10 @@ should_confirm_message(#delivery{sender = SenderPid, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(_Delivery, _State) -> - immediately. +should_confirm_message(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, + _State) -> + {immediately, SenderPid, MsgSeqNo}. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. @@ -500,6 +499,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + State; maybe_record_confirm_message(_Confirm, State) -> State. @@ -511,52 +513,50 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - message = Message, - msg_seq_no = MsgSeqNo}, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case Confirm of - immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); - _ -> ok - end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - DeliverFun = - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), - {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} - end, - {Delivered, State2} = - deliver_msgs_to_consumers(DeliverFun, false, - State#q{backing_queue_state = BQS1}), - {Delivered, Confirm, State2}; + deliver_msgs_to_consumers( + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then %% it must have been seen under the same circumstances as %% now: i.e. if it is now a deliver_immediately then it %% must have been before. - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + {case Duplicate of + published -> true; + discarded -> false + end, + State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), - case Delivered of - true -> State2; - false -> Props = message_properties(Confirm, State), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, State) -> + Confirm = should_confirm_message(Delivery, State), + case attempt_delivery(Delivery, Confirm, State) of + {true, State1} -> + maybe_record_confirm_message(Confirm, State1); + %% the next two are optimisations + {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> + discard_delivery(Delivery, State1); + {false, State1 = #q{ttl = 0, dlx = undefined}} -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + discard_delivery(Delivery, State1); + {false, State1} -> + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + Props = message_properties(Confirm, State2), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -728,11 +728,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed_mq = UMQ, - dlx = DLX, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> {ok, _, QPids} = rabbit_basic:publish( rabbit_basic:delivery( @@ -741,20 +739,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> State3 = - lists:foldl( - fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> - UQM1 = rabbit_misc:gb_trees_set_insert( - QPid, MsgSeqNo, UQM), - State0#q{unconfirmed_qm = UQM1} - end, State2, QPids), - noreply(State3#q{ - unconfirmed_mq = - gb_trees:insert( - MsgSeqNo, {gb_sets:from_list(QPids), - AckTag}, UMQ)}) + [] -> cleanup_after_confirm([AckTag], State2); + _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), + noreply(State2#q{unconfirmed = UC1}) end. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> @@ -773,64 +760,30 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> + unconfirmed = UC}) -> case dict:find(QPid, QMons) of error -> noreply(State); {ok, _} -> - rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - State1 = State#q{queue_monitors = dict:erase(QPid, QMons)}, - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State1); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1) - end + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + rabbit_log:warning( + "DLQ ~p for ~s died with ~p unconfirmed messages~n", + [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = dict:erase(QPid, QMons), + unconfirmed = UC1}) end. -handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags1, UMQ3} = - lists:foldl( - fun (MsgSeqNo, {AckTags, UMQ1}) -> - {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), - QPids1 = gb_sets:delete(QPid, QPids), - case gb_sets:is_empty(QPids1) of - true -> {[AckTag | AckTags], - gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {AckTags, gb_trees:update( - MsgSeqNo, {QPids1, AckTag}, UMQ1)} - end - end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, BQS), - MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), - gb_sets:from_list(MsgSeqNos)), - State1 = case gb_sets:is_empty(MsgSeqNos1) of - false -> State#q{ - unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> demonitor_queue( - QPid, State#q{ - unconfirmed_qm = - gb_trees:delete(QPid, UQM)}) - end, - cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS1}). - stop_later(Reason, State) -> stop_later(Reason, undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> - case {gb_trees:is_empty(UMQ), Reply} of +stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> + case {dtree:is_empty(UC), Reply} of {true, noreply} -> {stop, Reason, State}; {true, _} -> @@ -839,16 +792,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) end. -cleanup_after_confirm(State = #q{delayed_stop = DS, - unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso DS =/= undefined of +cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, + unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of {_, {_, noreply}} -> ok; {_, {From, Reply}} -> gen_server2:reply(From, Reply) end, {Reason, _} = DS, - {stop, Reason, State}; - false -> noreply(State) + {stop, Reason, State1}; + false -> noreply(State1) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -881,29 +838,33 @@ make_dead_letter_msg(DLX, Reason, exchange_name = Exchange, routing_keys = RoutingKeys}, State = #q{dlx_routing_key = DlxRoutingKey}) -> - Headers = rabbit_basic:extract_headers(Content), - #resource{name = QName} = qname(State), - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], - Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers), - {DeathRoutingKeys, Headers2} = + {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of - undefined -> {RoutingKeys, Headers1}; + undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], - lists:keydelete(<<"CC">>, 1, Headers1)} + fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, - Content1 = rabbit_basic:replace_headers(Headers2, Content), + #resource{name = QName} = qname(State), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = + [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + Info = [{<<"reason">>, + longstr, list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}], + HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, + Info, Headers)) + end, + Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), routing_keys = DeathRoutingKeys, content = Content1}. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> @@ -1094,7 +1055,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); false -> discard_delivery(Delivery, State1) @@ -1229,8 +1191,14 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State) -> - handle_confirm(MsgSeqNos, QPid, State); +handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> + {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + State1 = case dtree:is_defined(QPid, UC1) of + false -> demonitor_queue(QPid, State); + true -> State + end, + cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index a89aa074..8ad59016 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/6, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, replace_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -63,8 +63,8 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(replace_headers/2 :: (headers(), rabbit_types:content()) - -> rabbit_types:content()). +-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) + -> rabbit_types:content()). -spec(header_routes/1 :: (undefined | rabbit_framing:amqp_table()) -> [string()]). @@ -193,9 +193,12 @@ extract_headers(Content) -> rabbit_binary_parser:ensure_content_decoded(Content), Headers. -replace_headers(Headers, Content = #content{properties = Props}) -> +map_headers(F, Content) -> + Content1 = rabbit_binary_parser:ensure_content_decoded(Content), + #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, + Headers1 = F(Headers), rabbit_binary_generator:clear_encoded_content( - Content#content{properties = Props#'P_basic'{headers = Headers}}). + Content1#content{properties = Props#'P_basic'{headers = Headers1}}). indexof(L, Element) -> indexof(L, Element, 1). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cac622f8..0c1c11d8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,8 @@ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + stats_timer, confirm_enabled, publish_seqno, unconfirmed, + confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, @@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), - record_confirms(MXs, State1). - -process_confirms(MsgSeqNos, QPid, Nack, State) -> - lists:foldl( - fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], State}, MsgSeqNos). - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, - {MXs, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}}, - Nack) -> - State1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> UQM1 = gb_trees:delete(QPid, UQM), - State#ch{unconfirmed_qm = UQM1}; - false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State#ch{unconfirmed_qm = UQM1} - end; - none -> - State - end, - Qs1 = gb_sets:del_element(QPid, Qs), - %% If QPid somehow died initiating a nack, clear the message from - %% internal data-structures. Also, cleanup empty entries. - case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), - {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; - false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), - {MXs, State1#ch{unconfirmed_mq = UMQ1}} - end. +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1152,22 +1115,13 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> true -> State end. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] - end, - %% We remove the MsgSeqNos from UQM before calling - %% process_confirms to prevent each MsgSeqNo being removed from - %% the set one by one which which would be inefficient - State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; - false -> {false, fun record_confirms/2} - end, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - SendFun(MXs, State2). +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {MXs, UC1} = dtree:take_all(QPid, UC), + send_nacks(MXs, State#ch{unconfirmed = UC1}); + false -> {MXs, UC1} = dtree:take(QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, @@ -1392,21 +1346,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ} = State, - UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), - SingletonSet = gb_sets:singleton(MsgSeqNo), - lists:foldl( - fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> - case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State0#ch{unconfirmed_qm = UQM1}; - none -> - UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - State0#ch{unconfirmed_qm = UQM1} - end - end, State#ch{unconfirmed_mq = UMQ1}, QPids). + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed)}. send_nacks([], State) -> State; @@ -1444,11 +1385,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UMQ) of + CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo + false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1462,8 +1403,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, maybe_complete_tx(State = #ch{tx_status = in_progress}) -> State; -maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) of +maybe_complete_tx(State = #ch{unconfirmed = UC}) -> + case dtree:is_empty(UC) of false -> State; true -> complete_tx(State#ch{confirmed = []}) end. @@ -1491,8 +1432,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> - gb_trees:size(UMQ); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + dtree:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 83e28c44..910a89b4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +%% Optimisation +route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, + #delivery{message = #basic_message{routing_keys = RKs}}) -> + [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + route(X = #exchange{name = XName}, Delivery) -> route1(Delivery, {queue:from_list([X]), XName, []}). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 764c3764..04b7514f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -173,11 +173,12 @@ dropwhile(Pred, MsgFun, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> - Len = BQ:len(BQS), + Len = BQ:len(BQS), BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), - Dropped = Len - BQ:len(BQS1), + Len1 = BQ:len(BQS1), + ok = gm:broadcast(GM, {set_length, Len1}), + Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. @@ -237,11 +238,11 @@ ack(AckTags, State = #state { gm = GM, backing_queue_state = BQS, ack_msg_id = AM }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 98a80a26..0a51bcaa 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -846,7 +846,7 @@ process_instruction({ack, MsgIds}, process_instruction({fold, MsgFun, AckTags}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQS1 = BQ:fold(AckTags, MsgFun, BQS), + BQS1 = BQ:fold(MsgFun, BQS, AckTags), {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 39409d2f..c1be7613 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,8 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, - gb_trees_set_insert/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -177,7 +176,6 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). --spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -719,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -gb_trees_set_insert(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> - Values1 = gb_sets:insert(Value, Values), - gb_trees:update(Key, Values1, Tree); - none -> - gb_trees:insert(Key, gb_sets:singleton(Value), Tree) - end. - gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 55e4a6f8..e356d7ff 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -883,6 +883,8 @@ test_cluster_management() -> "invalid2@invalid"]), ok = assert_ram_node(), + ok = control_action(reset, []), + SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = test_cluster_management2(SecondaryNode); @@ -898,7 +900,6 @@ test_cluster_management2(SecondaryNode) -> SecondaryNodeS = atom_to_list(SecondaryNode), %% make a disk node - ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), ok = assert_disc_node(), %% make a ram node @@ -1244,6 +1245,9 @@ test_confirms() -> }, rabbit_basic:build_content( #'P_basic'{delivery_mode = 2}, <<"">>)), + %% We must not kill the queue before the channel has processed the + %% 'publish'. + ok = rabbit_channel:flush(Ch), %% Crash the queue QPid1 ! boom, %% Wait for a nack |