diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-24 12:37:54 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-24 12:37:54 +0000 |
commit | f32f7994cdca2ca32165dddb15c346fdfc6fd7ea (patch) | |
tree | 9798d381fb339f5cb57d93b0076a7a4190a45178 | |
parent | 215f40f5fa6ea7564d2ab2d5b4d24a19622544d9 (diff) | |
download | rabbitmq-server-f32f7994cdca2ca32165dddb15c346fdfc6fd7ea.tar.gz |
cosmetic
-rw-r--r-- | src/rabbit_channel.erl | 69 |
1 files changed, 31 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4695d619..8f11faef 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -104,7 +104,7 @@ -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). --spec(confirm/2 ::(pid(), integer()) -> 'ok'). +-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). -endif. @@ -279,13 +279,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_multiple_acks, - State = #ch{writer_pid = WriterPid, - held_confirms = As, - unconfirmed = UC}) -> - flush_multiple(WriterPid, As, UC), - {noreply, State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined}}; +handle_cast(flush_multiple_acks, State) -> + {noreply, flush_multiple(State)}; handle_cast({confirm, MsgSeqNo, From}, State) -> {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. @@ -304,19 +299,14 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{writer_pid = WriterPid, - held_confirms = As, - stats_timer = StatsTimer, - unconfirmed = UC}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - flush_multiple(WriterPid, As, UC), + State1 = flush_multiple(State), rabbit_event:if_enabled(StatsTimer, fun() -> - internal_emit_stats(State) + internal_emit_stats(State1) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State#ch{held_confirms = gb_sets:new(), - stats_timer = StatsTimer1, - confirm_tref = undefined}}. + {hibernate, State1#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -477,7 +467,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> - start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)}) + start_confirm_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, @@ -486,11 +476,11 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, %% clears references to MsgSeqNo and does ConfirmFun case gb_sets:is_element(MsgSeqNo, UC) of true -> + Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), case QPid of undefined -> ConfirmFun(MsgSeqNo, - State#ch{unconfirmed = - gb_sets:delete(MsgSeqNo, UC)}); + State#ch{unconfirmed = Unconfirmed1}); _ -> {ok, Qs} = dict:find(MsgSeqNo, QFM), Qs1 = sets:del_element(QPid, Qs), @@ -499,8 +489,7 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, State#ch{ queues_for_msg = dict:erase(MsgSeqNo, QFM), - unconfirmed = - gb_sets:delete(MsgSeqNo, UC)}); + unconfirmed = Unconfirmed1}); _ -> State#ch{queues_for_msg = dict:store(MsgSeqNo, Qs1, QFM)} end @@ -557,7 +546,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), State2 = process_routing_result(RoutingRes, DeliveredQPids, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | @@ -1280,7 +1270,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, end. terminate(State) -> - stop_ack_timer(State), + stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1363,35 +1353,38 @@ erase_queue_stats(QPid) -> [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. -start_ack_timer(State = #ch{confirm_tref = undefined}) -> +start_confirm_timer(State = #ch{confirm_tref = undefined}) -> {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, ?MODULE, flush_multiple_acks, [self()]), State#ch{confirm_tref = TRef}; -start_ack_timer(State) -> +start_confirm_timer(State) -> State. -stop_ack_timer(State = #ch{confirm_tref = undefined}) -> +stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> State; -stop_ack_timer(State = #ch{confirm_tref = TRef}) -> +stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#ch{confirm_tref = undefined}. -flush_multiple(WriterPid, As, NA) -> - case gb_sets:is_empty(As) of - true -> ok; - false -> [First | Rest] = gb_sets:to_list(As), +flush_multiple(State = #ch{writer_pid = WriterPid, + held_confirms = Cs, + unconfirmed = UC}) -> + case gb_sets:is_empty(Cs) of + true -> State; + false -> [First | Rest] = gb_sets:to_list(Cs), [rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = A}) || - A <- case Rest of + #'basic.ack'{delivery_tag = T}) || + T <- case Rest of [] -> [First]; _ -> flush_multiple( First, Rest, WriterPid, - case gb_sets:is_empty(NA) of - false -> gb_sets:smallest(NA); - true -> gb_sets:largest(As) + 1 + case gb_sets:is_empty(UC) of + false -> gb_sets:smallest(UC); + true -> gb_sets:largest(Cs) + 1 end) end], - ok + State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined} end. flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> |