summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-07 23:31:31 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-07 23:31:31 +0000
commit591199daa9a5d346e17f1f5053d4696edd096ff7 (patch)
tree08f7841ceede30101666f8071e4a14c7574d43fa
parent37ee768e978377a87bc5d17d072d51adc97547d7 (diff)
downloadrabbitmq-server-591199daa9a5d346e17f1f5053d4696edd096ff7.tar.gz
make confirm timeout configurable
-rw-r--r--src/rabbit_channel.erl40
1 files changed, 17 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c8ad00a..5bf545a1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
+ confirm_enabled, publish_seqno, confirm_duration, confirm_tref,
held_confirms, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +72,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -192,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 0,
- confirm_multiple = false,
+ confirm_duration = 0,
held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
@@ -459,7 +457,7 @@ send_or_enqueue_ack(undefined, _QPid, State) ->
State;
send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_duration = 0}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
@@ -467,11 +465,12 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_duration = CD}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
+ State1#ch{held_confirms = gb_sets:add(MSN, As)},
+ CD)
end, State).
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
@@ -978,20 +977,15 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{batch_duration = Duration, nowait = NoWait},
+ _, State = #ch{confirm_tref = TRef}) ->
+ State1 = case TRef =:= undefined of
+ true -> State;
+ false -> flush_multiple(State)
+ end,
+ return_ok(State1#ch{confirm_enabled = true, confirm_duration = Duration},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1344,11 +1338,11 @@ erase_queue_stats(QPid) ->
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
- ?MODULE, flush_multiple_acks, [self()]),
+start_confirm_timer(State = #ch{confirm_tref = undefined}, Duration) ->
+ {ok, TRef} = timer:apply_after(Duration, ?MODULE,
+ flush_multiple_acks, [self()]),
State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
+start_confirm_timer(State, _) ->
State.
stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->