diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-23 14:16:33 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-23 14:16:33 +0000 |
commit | c2cfaef70562ddb690163da116d269f50e1fffaa (patch) | |
tree | 784ca71cbcb453bc1dc26f0fa44f610d247e71c3 | |
parent | 02396f8d21a641b4640657e1203b7c2a343a1c73 (diff) | |
download | rabbitmq-server-c2cfaef70562ddb690163da116d269f50e1fffaa.tar.gz |
remove references to confirm.select{multiple}
-rw-r--r-- | src/rabbit_channel.erl | 120 |
1 files changed, 43 insertions, 77 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index edafd52d..b2b0c4a4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/7, do/2, do/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -49,8 +49,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +71,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_CONFIRMS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -98,7 +95,6 @@ -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). --spec(flush_confirms/1 :: (pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -140,9 +136,6 @@ flushed(Pid, QPid) -> confirm(Pid, MsgSeqNo) -> gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). -flush_confirms(Pid) -> - gen_server2:cast(Pid, flush_confirms). - list() -> pg_local:get_members(rabbit_channels). @@ -193,8 +186,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -292,9 +283,6 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_confirms, State) -> - {noreply, internal_flush_confirms(State)}; - handle_cast({confirm, MsgSeqNo, From}, State) -> {noreply, confirm(MsgSeqNo, From, State)}. @@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State1#ch{stats_timer = StatsTimer1}}. + {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -477,19 +464,10 @@ confirm(undefined, _QPid, State) -> State; confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> +confirm(MsgSeqNo, QPid, State) -> do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{ - delivery_tag = MSN}), - State1 - end, State); -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) + fun(State0) -> + internal_flush_confirms(State0, gb_sets:singleton(MsgSeqNo)) end, State). do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, @@ -501,16 +479,16 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), case QPid of undefined -> - ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); + ConfirmFun(State#ch{unconfirmed = Unconfirmed1}); _ -> {ok, Qs} = dict:find(MsgSeqNo, QFM), Qs1 = sets:del_element(QPid, Qs), case sets:size(Qs1) of - 0 -> ConfirmFun(MsgSeqNo, - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); + 0 -> ConfirmFun( + State#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM), + unconfirmed = Unconfirmed1}); _ -> State#ch{queues_for_msg = dict:store(MsgSeqNo, Qs1, QFM)} end @@ -998,20 +976,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1260,47 +1228,45 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, - ?MODULE, flush_confirms, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> - State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> +internal_flush_confirms(State = #ch{writer_pid = WriterPid, + unconfirmed = UC}, Cs) -> case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; + true -> State; false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), + LUC = case gb_sets:size(UC) of + 0 -> gb_sets:largest(Cs) + 1; + _ -> gb_sets:smallest(UC) + end, + Is = case First < LUC of + true -> {Mult, Inds} = + find_consecutive_sequence(LUC, First, + Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, + multiple = true}), + Inds; + _ -> [First | Rest] + end, ok = lists:foldl( fun(T, ok) -> rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} + end, ok, Is), + State end. -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> +%% Find longest sequence of consecutive numbers at the beginning with +%% no elements exceeding limit. +find_consecutive_sequence(_Limit, Last, []) -> {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> +find_consecutive_sequence(Limit, Last, [N | Ns]) + when N == (Last + 1) andalso N < Limit -> + find_consecutive_sequence(Limit, N, Ns); +find_consecutive_sequence(_Limit, Last, Ns) -> {Last, Ns}. -terminate(State) -> - stop_confirm_timer(State), +terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). |