diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-07 23:31:31 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-07 23:31:31 +0000 |
commit | 591199daa9a5d346e17f1f5053d4696edd096ff7 (patch) | |
tree | 08f7841ceede30101666f8071e4a14c7574d43fa | |
parent | 37ee768e978377a87bc5d17d072d51adc97547d7 (diff) | |
download | rabbitmq-server-591199daa9a5d346e17f1f5053d4696edd096ff7.tar.gz |
make confirm timeout configurable
-rw-r--r-- | src/rabbit_channel.erl | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c8ad00a..5bf545a1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, + confirm_enabled, publish_seqno, confirm_duration, confirm_tref, held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +72,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -192,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 0, - confirm_multiple = false, + confirm_duration = 0, held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, @@ -459,7 +457,7 @@ send_or_enqueue_ack(undefined, _QPid, State) -> State; send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_duration = 0}) -> do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( @@ -467,11 +465,12 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> delivery_tag = MSN}), State1 end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_duration = CD}) -> do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) + State1#ch{held_confirms = gb_sets:add(MSN, As)}, + CD) end, State). do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, @@ -978,20 +977,15 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{batch_duration = Duration, nowait = NoWait}, + _, State = #ch{confirm_tref = TRef}) -> + State1 = case TRef =:= undefined of + true -> State; + false -> flush_multiple(State) + end, + return_ok(State1#ch{confirm_enabled = true, confirm_duration = Duration}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1344,11 +1338,11 @@ erase_queue_stats(QPid) -> [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, - ?MODULE, flush_multiple_acks, [self()]), +start_confirm_timer(State = #ch{confirm_tref = undefined}, Duration) -> + {ok, TRef} = timer:apply_after(Duration, ?MODULE, + flush_multiple_acks, [self()]), State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> +start_confirm_timer(State, _) -> State. stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> |