summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-23 14:16:33 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-23 14:16:33 +0000
commitc2cfaef70562ddb690163da116d269f50e1fffaa (patch)
tree784ca71cbcb453bc1dc26f0fa44f610d247e71c3
parent02396f8d21a641b4640657e1203b7c2a343a1c73 (diff)
downloadrabbitmq-server-c2cfaef70562ddb690163da116d269f50e1fffaa.tar.gz
remove references to confirm.select{multiple}
-rw-r--r--src/rabbit_channel.erl120
1 files changed, 43 insertions, 77 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index edafd52d..b2b0c4a4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -49,8 +49,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +71,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_CONFIRMS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -98,7 +95,6 @@
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
--spec(flush_confirms/1 :: (pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -140,9 +136,6 @@ flushed(Pid, QPid) ->
confirm(Pid, MsgSeqNo) ->
gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-flush_confirms(Pid) ->
- gen_server2:cast(Pid, flush_confirms).
-
list() ->
pg_local:get_members(rabbit_channels).
@@ -193,8 +186,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -292,9 +283,6 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_confirms, State) ->
- {noreply, internal_flush_confirms(State)};
-
handle_cast({confirm, MsgSeqNo, From}, State) ->
{noreply, confirm(MsgSeqNo, From, State)}.
@@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State1#ch{stats_timer = StatsTimer1}}.
+ {hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -477,19 +464,10 @@ confirm(undefined, _QPid, State) ->
State;
confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+confirm(MsgSeqNo, QPid, State) ->
do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{
- delivery_tag = MSN}),
- State1
- end, State);
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
+ fun(State0) ->
+ internal_flush_confirms(State0, gb_sets:singleton(MsgSeqNo))
end, State).
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
@@ -501,16 +479,16 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
case QPid of
undefined ->
- ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
+ ConfirmFun(State#ch{unconfirmed = Unconfirmed1});
_ ->
{ok, Qs} = dict:find(MsgSeqNo, QFM),
Qs1 = sets:del_element(QPid, Qs),
case sets:size(Qs1) of
- 0 -> ConfirmFun(MsgSeqNo,
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
+ 0 -> ConfirmFun(
+ State#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM),
+ unconfirmed = Unconfirmed1});
_ -> State#ch{queues_for_msg =
dict:store(MsgSeqNo, Qs1, QFM)}
end
@@ -998,20 +976,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1260,47 +1228,45 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
- ?MODULE, flush_confirms, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-internal_flush_confirms(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
+internal_flush_confirms(State = #ch{writer_pid = WriterPid,
+ unconfirmed = UC}, Cs) ->
case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
+ true -> State;
false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
+ LUC = case gb_sets:size(UC) of
+ 0 -> gb_sets:largest(Cs) + 1;
+ _ -> gb_sets:smallest(UC)
+ end,
+ Is = case First < LUC of
+ true -> {Mult, Inds} =
+ find_consecutive_sequence(LUC, First,
+ Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult,
+ multiple = true}),
+ Inds;
+ _ -> [First | Rest]
+ end,
ok = lists:foldl(
fun(T, ok) -> rabbit_writer:send_command(
WriterPid,
#'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
+ end, ok, Is),
+ State
end.
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
+%% Find longest sequence of consecutive numbers at the beginning with
+%% no elements exceeding limit.
+find_consecutive_sequence(_Limit, Last, []) ->
{Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
+find_consecutive_sequence(Limit, Last, [N | Ns])
+ when N == (Last + 1) andalso N < Limit ->
+ find_consecutive_sequence(Limit, N, Ns);
+find_consecutive_sequence(_Limit, Last, Ns) ->
{Last, Ns}.
-terminate(State) ->
- stop_confirm_timer(State),
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).