diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-10 21:22:26 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-10 21:22:26 +0000 |
commit | 7372c6f05bd26d0e480183429d09be44676e5338 (patch) | |
tree | 6fd41089b040fdf490b61c96982141f94198077f | |
parent | 91c02ce7f181d6edb8f3c40fcc25bc55c3b1bc29 (diff) | |
parent | 22d129f63132032fa014ba78ca149b1b0b6106c7 (diff) | |
download | rabbitmq-server-7372c6f05bd26d0e480183429d09be44676e5338.tar.gz |
merge bug23593 into default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 186 |
2 files changed, 93 insertions, 129 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80dc651a..fde54346 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -425,15 +425,33 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State) -> - lists:foldl(fun confirm_message_by_guid/2, State, Guids). - -confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> - case dict:find(Guid, GTC) of - {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok +confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> + {CMs, GTC1} = + lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {[], GTC}, Guids), + case lists:usort(CMs) of + [{Ch, MsgSeqNo} | CMs1] -> + [rabbit_channel:confirm(ChPid, MsgSeqNos) || + {ChPid, MsgSeqNos} <- group_confirms_by_channel( + CMs1, [{Ch, [MsgSeqNo]}])]; + [] -> + ok end, - State#q{guid_to_channel = dict:erase(Guid, GTC)}. + State#q{guid_to_channel = GTC1}. + +group_confirms_by_channel([], Acc) -> + Acc; +group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); +group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> {no_confirm, State}; @@ -467,7 +485,7 @@ attempt_delivery(#delivery{txn = none, %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming case {NeedsConfirming, MsgSeqNo} of {_, undefined} -> ok; - {no_confirm, _} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); {confirm, _} -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2067e306..579d7f49 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/7, do/2, do/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -49,8 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +71,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_CONFIRMS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -97,8 +94,7 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). --spec(flush_confirms/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - -flush_confirms(Pid) -> - gen_server2:cast(Pid, flush_confirms). +confirm(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). list() -> pg_local:get_members(rabbit_channels). @@ -193,8 +186,6 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_confirms, State) -> - {noreply, internal_flush_confirms(State)}; - -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, confirm(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, State) -> + {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> confirm(Msg, QPid, State0); + 0 -> confirm([Msg], QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State1#ch{stats_timer = StatsTimer1}}. + {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -484,51 +471,39 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -confirm(undefined, _QPid, State) -> +confirm([], _QPid, State) -> State; -confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) -> State; -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{ - delivery_tag = MSN}), - State1 - end, State); -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) - end, State). - -do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - %% clears references to MsgSeqNo and does ConfirmFun - case gb_sets:is_element(MsgSeqNo, UC) of - true -> - Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), - case QPid of - undefined -> - ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); - _ -> - {ok, Qs} = dict:find(MsgSeqNo, QFM), - Qs1 = sets:del_element(QPid, Qs), - case sets:size(Qs1) of - 0 -> ConfirmFun(MsgSeqNo, - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); - _ -> State#ch{queues_for_msg = - dict:store(MsgSeqNo, Qs1, QFM)} - end - end; - false -> - State - end. +confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)], + MS = gb_sets:from_list(MsgSeqNos), + QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM), + send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS), + queues_for_msg = QFM1}); +confirm(MsgSeqNos, QPid, State) -> + {DoneMessages, State1} = + lists:foldl( + fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> + case gb_sets:is_element(MsgSeqNo, UC0) of + false -> {DMs, State0}; + true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + State0#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM0), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC0)}}; + _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), + {DMs, State0#ch{queues_for_msg = QFM1}} + end + end + end, {[], State}, MsgSeqNos), + send_confirms(DoneMessages, State1). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1010,20 +985,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1253,12 +1218,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, @@ -1272,47 +1237,28 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, - ?MODULE, flush_confirms, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> +send_confirms([], State) -> State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. - -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. +send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SCs = lists:usort(Cs), + CutOff = case gb_sets:is_empty(UC) of + true -> lists:last(SCs) + 1; + false -> gb_sets:smallest(UC) + end, + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + case Ms of + [] -> ok; + _ -> ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) + end, + ok = lists:foldl(fun(T, ok) -> + rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = T}) + end, ok, Ss), + State. -terminate(State) -> - stop_confirm_timer(State), +terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). |