diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-22 15:47:53 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-22 15:47:53 +0000 |
commit | ed018f70fc43daac70e3c85e7347befde8153fc0 (patch) | |
tree | e46972a05fb1b8c6731fac61c71410ba1d4b096b | |
parent | fb723dce3618250668e75c9da9a91fac4cc2f1fa (diff) | |
parent | d9c055ce65f2d6c2626d23de8cc2d0bceb5aceea (diff) | |
download | rabbitmq-server-ed018f70fc43daac70e3c85e7347befde8153fc0.tar.gz |
merge default into bug24750
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 119 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 |
2 files changed, 41 insertions, 91 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 41fc173b..afece3f3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -51,8 +51,7 @@ ttl, ttl_timer_ref, publish_seqno, - unconfirmed_mq, - unconfirmed_qm, + unconfirmed, delayed_stop, queue_monitors, dlx, @@ -138,8 +137,7 @@ init(Q) -> dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -164,8 +162,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -742,11 +739,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed_mq = UMQ, - dlx = DLX, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> {ok, _, QPids} = rabbit_basic:publish( rabbit_basic:delivery( @@ -755,20 +750,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> State3 = - lists:foldl( - fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> - UQM1 = rabbit_misc:gb_trees_set_insert( - QPid, MsgSeqNo, UQM), - State0#q{unconfirmed_qm = UQM1} - end, State2, QPids), - noreply(State3#q{ - unconfirmed_mq = - gb_trees:insert( - MsgSeqNo, {gb_sets:from_list(QPids), - AckTag}, UMQ)}) + [] -> cleanup_after_confirm([AckTag], State2); + _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), + noreply(State2#q{unconfirmed = UC1}) end. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> @@ -787,64 +771,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> + unconfirmed = UC}) -> case dict:find(QPid, QMons) of error -> noreply(State); {ok, _} -> rabbit_log:info("DLQ ~p (for ~s) died~n", [QPid, rabbit_misc:rs(qname(State))]), - State1 = State#q{queue_monitors = dict:erase(QPid, QMons)}, - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State1); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1) - end + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + case (MsgSeqNoAckTags =/= [] andalso + rabbit_misc:is_abnormal_termination(Reason)) of + true -> rabbit_log:warning("Dead queue lost ~p messages~n", + [length(MsgSeqNoAckTags)]); + false -> ok + end, + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = dict:erase(QPid, QMons), + unconfirmed = UC1}) end. -handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags1, UMQ3} = - lists:foldl( - fun (MsgSeqNo, {AckTags, UMQ1}) -> - {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), - QPids1 = gb_sets:delete(QPid, QPids), - case gb_sets:is_empty(QPids1) of - true -> {[AckTag | AckTags], - gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {AckTags, gb_trees:update( - MsgSeqNo, {QPids1, AckTag}, UMQ1)} - end - end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, BQS), - MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), - gb_sets:from_list(MsgSeqNos)), - State1 = case gb_sets:is_empty(MsgSeqNos1) of - false -> State#q{ - unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> demonitor_queue( - QPid, State#q{ - unconfirmed_qm = - gb_trees:delete(QPid, UQM)}) - end, - cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS1}). - stop_later(Reason, State) -> stop_later(Reason, undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> - case {gb_trees:is_empty(UMQ), Reply} of +stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> + case {dtree:is_empty(UC), Reply} of {true, noreply} -> {stop, Reason, State}; {true, _} -> @@ -853,16 +804,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) end. -cleanup_after_confirm(State = #q{delayed_stop = DS, - unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso DS =/= undefined of +cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, + unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of {_, {_, noreply}} -> ok; {_, {From, Reply}} -> gen_server2:reply(From, Reply) end, {Reason, _} = DS, - {stop, Reason, State}; - false -> noreply(State) + {stop, Reason, State1}; + false -> noreply(State1) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -1243,8 +1198,14 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State) -> - handle_confirm(MsgSeqNos, QPid, State); +handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> + {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + State1 = case dtree:is_defined(QPid, UC1) of + false -> demonitor_queue(QPid, State); + true -> State + end, + cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ddf7f574..5b4eed37 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,8 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, - gb_trees_set_insert/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -176,7 +175,6 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). --spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -717,15 +715,6 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -gb_trees_set_insert(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> - Values1 = gb_sets:insert(Value, Values), - gb_trees:update(Key, Values1, Tree); - none -> - gb_trees:insert(Key, gb_sets:singleton(Value), Tree) - end. - gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). |