diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-05-20 14:22:56 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-05-20 14:22:56 +0100 |
commit | 7effbff31ee2cee9b2a543ab10e210e74e80129d (patch) | |
tree | d7e431a690c492a84807fe528a7d97b71fbc7d31 | |
parent | bcd37ba7a6b103cb37686dc65e945a17add6b123 (diff) | |
parent | 9fd568ecdbc48e5d8caae5ef400b139a4848c5e8 (diff) | |
download | rabbitmq-server-7effbff31ee2cee9b2a543ab10e210e74e80129d.tar.gz |
merge bug24118 into default
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 49 |
5 files changed, 47 insertions, 43 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index d9296bf6..1c2b94e2 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -66,8 +66,8 @@ -spec(set_ram_duration_target/2 :: (('undefined' | 'infinity' | number()), state()) -> state()). -spec(ram_duration/1 :: (state()) -> {number(), state()}). --spec(needs_idle_timeout/1 :: (state()) -> boolean()). --spec(idle_timeout/1 :: (state()) -> state()). +-spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle'). +-spec(timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). -spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 110817a9..8091e2c2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -220,9 +220,10 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> ensure_rate_timer( confirm_messages(MsgIds, State#q{ backing_queue_state = BQS1}))), - case BQ:needs_idle_timeout(BQS1) of - true -> {ensure_sync_timer(State1), 0}; - false -> {stop_sync_timer(State1), hibernate} + case BQ:needs_timeout(BQS1) of + false -> {stop_sync_timer(State1), hibernate}; + idle -> {stop_sync_timer(State1), 0 }; + timed -> {ensure_sync_timer(State1), 0 } end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -661,8 +662,8 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). +backing_queue_timeout(State = #q{backing_queue = BQ}) -> + run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1046,7 +1047,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> - noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); + noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -1180,7 +1181,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> end; handle_info(timeout, State) -> - noreply(backing_queue_idle_timeout(State)); + noreply(backing_queue_timeout(State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0955a080..addaabc5 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -149,15 +149,15 @@ behaviour_info(callbacks) -> %% queue. {ram_duration, 1}, - %% Should 'idle_timeout' be called as soon as the queue process + %% Should 'timeout' be called as soon as the queue process %% can manage (either on an empty mailbox, or when a timer %% fires)? - {needs_idle_timeout, 1}, + {needs_timeout, 1}, - %% Called (eventually) after needs_idle_timeout returns - %% 'true'. Note this may be called more than once for each 'true' - %% returned from needs_idle_timeout. - {idle_timeout, 1}, + %% Called (eventually) after needs_timeout returns 'idle' or + %% 'timed'. Note this may be called more than once for each + %% 'idle' or 'timed' returned from needs_timeout. + {timeout, 1}, %% Called immediately before the queue hibernates. {handle_pre_hibernate, 1}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3726420d..1a37cdff 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2269,10 +2269,10 @@ check_variable_queue_status(VQ0, Props) -> VQ1. variable_queue_wait_for_shuffling_end(VQ) -> - case rabbit_variable_queue:needs_idle_timeout(VQ) of - true -> variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:idle_timeout(VQ)); - false -> VQ + case rabbit_variable_queue:needs_timeout(VQ) of + false -> VQ; + _ -> variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:timeout(VQ)) end. test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> @@ -2300,7 +2300,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), {_Guids, VQ4} = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), - VQ5 = rabbit_variable_queue:idle_timeout(VQ4), + VQ5 = rabbit_variable_queue:timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7a3c17a2..8ac3ad43 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,7 +21,7 @@ fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, - needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/3, discard/3, multiple_routing_keys/0]). @@ -146,7 +146,7 @@ %% any one time. This further smooths the effects of changes to the %% target_ram_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The -%% idle_timeout callback is utilised to ensure that conversions are +%% timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains %% responsive. %% @@ -567,20 +567,22 @@ dropwhile1(Pred, State) -> internal_queue_out( fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> case Pred(MsgProps) of - true -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile1(Pred, State2); - false -> - %% message needs to go back into Q4 (or maybe go - %% in for the first time if it was loaded from - %% Q3). Also the msg contents might not be in - %% RAM, so read them in now - {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = - read_msg(MsgStatus, State1), - {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }} + true -> {_, State2} = internal_fetch(false, MsgStatus, + State1), + dropwhile1(Pred, State2); + false -> {ok, in_r(MsgStatus, State1)} end end, State). +in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, + State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> + true = queue:is_empty(Q4), %% ASSERTION + State #vqstate { + q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), + ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; +in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> + State #vqstate { q4 = queue:in_r(MsgStatus, Q4) }. + fetch(AckRequired, State) -> internal_queue_out( fun(MsgStatus, State1) -> @@ -830,21 +832,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_idle_timeout(State = #vqstate { on_sync = OnSync }) -> +needs_timeout(State = #vqstate { on_sync = OnSync }) -> case {OnSync, needs_index_sync(State)} of {?BLANK_SYNC, false} -> - {Res, _State} = reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State), - Res; + case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end; _ -> - true + timed end. -idle_timeout(State) -> +timeout(State) -> a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> |