summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-24 12:37:54 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-24 12:37:54 +0000
commitf32f7994cdca2ca32165dddb15c346fdfc6fd7ea (patch)
tree9798d381fb339f5cb57d93b0076a7a4190a45178
parent215f40f5fa6ea7564d2ab2d5b4d24a19622544d9 (diff)
downloadrabbitmq-server-f32f7994cdca2ca32165dddb15c346fdfc6fd7ea.tar.gz
cosmetic
-rw-r--r--src/rabbit_channel.erl69
1 files changed, 31 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4695d619..8f11faef 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -104,7 +104,7 @@
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
--spec(confirm/2 ::(pid(), integer()) -> 'ok').
+-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
-endif.
@@ -279,13 +279,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_multiple_acks,
- State = #ch{writer_pid = WriterPid,
- held_confirms = As,
- unconfirmed = UC}) ->
- flush_multiple(WriterPid, As, UC),
- {noreply, State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}};
+handle_cast(flush_multiple_acks, State) ->
+ {noreply, flush_multiple(State)};
handle_cast({confirm, MsgSeqNo, From}, State) ->
{noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
@@ -304,19 +299,14 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
- held_confirms = As,
- stats_timer = StatsTimer,
- unconfirmed = UC}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- flush_multiple(WriterPid, As, UC),
+ State1 = flush_multiple(State),
rabbit_event:if_enabled(StatsTimer, fun() ->
- internal_emit_stats(State)
+ internal_emit_stats(State1)
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State#ch{held_confirms = gb_sets:new(),
- stats_timer = StatsTimer1,
- confirm_tref = undefined}}.
+ {hibernate, State1#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -477,7 +467,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)})
+ start_confirm_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)})
end, State).
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
@@ -486,11 +476,11 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
%% clears references to MsgSeqNo and does ConfirmFun
case gb_sets:is_element(MsgSeqNo, UC) of
true ->
+ Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
case QPid of
undefined ->
ConfirmFun(MsgSeqNo,
- State#ch{unconfirmed =
- gb_sets:delete(MsgSeqNo, UC)});
+ State#ch{unconfirmed = Unconfirmed1});
_ ->
{ok, Qs} = dict:find(MsgSeqNo, QFM),
Qs1 = sets:del_element(QPid, Qs),
@@ -499,8 +489,7 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
State#ch{
queues_for_msg =
dict:erase(MsgSeqNo, QFM),
- unconfirmed =
- gb_sets:delete(MsgSeqNo, UC)});
+ unconfirmed = Unconfirmed1});
_ -> State#ch{queues_for_msg =
dict:store(MsgSeqNo, Qs1, QFM)}
end
@@ -557,7 +546,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ MsgSeqNo)),
State2 = process_routing_result(RoutingRes, DeliveredQPids,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
@@ -1280,7 +1270,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
end.
terminate(State) ->
- stop_ack_timer(State),
+ stop_confirm_timer(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
@@ -1363,35 +1353,38 @@ erase_queue_stats(QPid) ->
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-start_ack_timer(State = #ch{confirm_tref = undefined}) ->
+start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
{ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
?MODULE, flush_multiple_acks, [self()]),
State#ch{confirm_tref = TRef};
-start_ack_timer(State) ->
+start_confirm_timer(State) ->
State.
-stop_ack_timer(State = #ch{confirm_tref = undefined}) ->
+stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
State;
-stop_ack_timer(State = #ch{confirm_tref = TRef}) ->
+stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#ch{confirm_tref = undefined}.
-flush_multiple(WriterPid, As, NA) ->
- case gb_sets:is_empty(As) of
- true -> ok;
- false -> [First | Rest] = gb_sets:to_list(As),
+flush_multiple(State = #ch{writer_pid = WriterPid,
+ held_confirms = Cs,
+ unconfirmed = UC}) ->
+ case gb_sets:is_empty(Cs) of
+ true -> State;
+ false -> [First | Rest] = gb_sets:to_list(Cs),
[rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = A}) ||
- A <- case Rest of
+ #'basic.ack'{delivery_tag = T}) ||
+ T <- case Rest of
[] -> [First];
_ -> flush_multiple(
First, Rest, WriterPid,
- case gb_sets:is_empty(NA) of
- false -> gb_sets:smallest(NA);
- true -> gb_sets:largest(As) + 1
+ case gb_sets:is_empty(UC) of
+ false -> gb_sets:smallest(UC);
+ true -> gb_sets:largest(Cs) + 1
end)
end],
- ok
+ State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}
end.
flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) ->