diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-06 17:29:02 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-06 17:29:02 +0100 |
commit | ba2c06a682f5b4e98f9fa66df094f98d52724246 (patch) | |
tree | 5bde2d8749f18f84dbd00a1d32cf348bbd644399 | |
parent | 7dab5238a12bffca25dcc6d6e932876de8127a28 (diff) | |
parent | 53c18464e9f0355f1b7e96ca9ecbfb5edf51cf96 (diff) | |
download | rabbitmq-server-ba2c06a682f5b4e98f9fa66df094f98d52724246.tar.gz |
merge bug21673 into bug22896
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 60 |
6 files changed, 66 insertions, 44 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 55cd126e..47748bdb 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -57,7 +57,7 @@ -spec(set_ram_duration_target/2 :: (('undefined' | 'infinity' | number()), state()) -> state()). -spec(ram_duration/1 :: (state()) -> {number(), state()}). --spec(needs_sync/1 :: (state()) -> boolean()). --spec(sync/1 :: (state()) -> state()). +-spec(needs_idle_timeout/1 :: (state()) -> boolean()). +-spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3bf48b4c..ec59095d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -179,7 +179,7 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), - case BQ:needs_sync(BQS)of + case BQ:needs_idle_timeout(BQS)of true -> {ensure_sync_timer(State1), 0}; false -> {stop_sync_timer(State1), hibernate} end. @@ -188,7 +188,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> BQ:sync(BQS) end]), + [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -822,7 +822,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:sync(BQS) end, State)); + fun (BQS) -> BQ:idle_timeout(BQS) end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 432d6290..b76ae11e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -113,14 +113,15 @@ behaviour_info(callbacks) -> %% queue. {ram_duration, 1}, - %% Should 'sync' be called as soon as the queue process can - %% manage (either on an empty mailbox, or when a timer fires)? - {needs_sync, 1}, - - %% Called (eventually) after needs_sync returns 'true'. Note this - %% may be called more than once for each 'true' returned from - %% needs_sync. - {sync, 1}, + %% Should 'idle_timeout' be called as soon as the queue process + %% can manage (either on an empty mailbox, or when a timer + %% fires)? + {needs_idle_timeout, 1}, + + %% Called (eventually) after needs_idle_timeout returns + %% 'true'. Note this may be called more than once for each 'true' + %% returned from needs_idle_timeout. + {idle_timeout, 1}, %% Called immediately before the queue hibernates. {handle_pre_hibernate, 1}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index a7ca20c8..e6bd11e3 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -34,8 +34,8 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, - handle_pre_hibernate/1, status/1]). + set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, + idle_timeout/1, handle_pre_hibernate/1, status/1]). -export([start/1]). @@ -197,9 +197,9 @@ set_ram_duration_target(_DurationTarget, State) -> State. ram_duration(State) -> {0, State}. -needs_sync(_State) -> false. +needs_idle_timeout(_State) -> false. -sync(State) -> State. +idle_timeout(State) -> State. handle_pre_hibernate(State) -> State. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b28dd839..dd6a9089 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1845,7 +1845,8 @@ test_variable_queue_partial_segments_delta_thing() -> VQ0 = fresh_variable_queue(), VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), - VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), + VQ3 = variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:set_ram_duration_target(0, VQ2)), %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), io:format("~p~n", [S3]), @@ -1854,7 +1855,8 @@ test_variable_queue_partial_segments_delta_thing() -> assert_prop(S3, q3, SegmentSize), assert_prop(S3, len, SegmentSize + HalfSegment), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), - VQ5 = variable_queue_publish(true, 1, VQ4), + VQ5 = variable_queue_wait_for_shuffling_end( + variable_queue_publish(true, 1, VQ4)), %% should have 1 alpha, but it's in the same segment as the deltas S5 = rabbit_variable_queue:status(VQ5), io:format("~p~n", [S5]), @@ -1881,6 +1883,13 @@ test_variable_queue_partial_segments_delta_thing() -> passed. +variable_queue_wait_for_shuffling_end(VQ) -> + case rabbit_variable_queue:needs_idle_timeout(VQ) of + true -> variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:idle_timeout(VQ)); + false -> VQ + end. + test_queue_recover() -> Count = 2*rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), @@ -1939,7 +1948,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere() -> VQa2 = variable_queue_publish(false, 4, VQa1), {VQa3, AckTags} = variable_queue_fetch(2, false, false, 4, VQa2), VQa4 = rabbit_variable_queue:requeue(AckTags, VQa3), - VQa5 = rabbit_variable_queue:sync(VQa4), + VQa5 = rabbit_variable_queue:idle_timeout(VQa4), _VQa6 = rabbit_variable_queue:terminate(VQa5), VQa7 = rabbit_variable_queue:init(test_queue(), true, true), {empty, VQa8} = rabbit_variable_queue:fetch(false, VQa7), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8b9d17e7..5893385a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -36,7 +36,8 @@ tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, - needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]). + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1]). -export([start/1]). @@ -221,7 +222,7 @@ %% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't %% write more - we can always come back on the next publish to do %% more. --define(RAM_INDEX_BATCH_SIZE, 64). +-define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -633,10 +634,16 @@ ram_duration(State = #vqstate { egress_rate = Egress, out_counter = 0, ram_msg_count_prev = RamMsgCount })}. -needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; -needs_sync(_) -> true. +needs_idle_timeout(#vqstate { on_sync = {_, _, SFuns}, + target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount }) + when SFuns =/= [] orelse RamMsgCount > TargetRamMsgCount -> + true; +needs_idle_timeout(State = #vqstate { ram_index_count = RamIndexCount }) -> + Permitted = permitted_ram_index_count(State), + Permitted =/= infinity andalso RamIndexCount > Permitted. -sync(State) -> a(tx_commit_index(State)). +idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -671,7 +678,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, persistent_count = PersistentCount, - target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }) -> E1 = queue:is_empty(Q1), @@ -679,13 +685,11 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, ED = Delta#delta.count == 0, E3 = bpqueue:is_empty(Q3), E4 = queue:is_empty(Q4), - TZ = TargetRamMsgCount == 0, LZ = Len == 0, true = E1 or not E3, true = E2 or not ED, true = ED or not E3, - true = (E1 and E2 and E4) or not TZ, true = LZ == (E3 and E4), true = Len >= 0, @@ -1117,19 +1121,21 @@ reduce_memory_use(State = #vqstate { when TargetRamMsgCount >= RamMsgCount -> limit_ram_index(State); reduce_memory_use(State = #vqstate { + ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) -> - State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), + Reduction = lists:min([RamMsgCount - TargetRamMsgCount, ?IO_BATCH_SIZE]), + { Reduction1, State1} = maybe_push_q1_to_betas(Reduction, State), + {_Reduction2, State2} = maybe_push_q4_to_betas(Reduction1, State1), case TargetRamMsgCount of - 0 -> push_betas_to_deltas(State1); - _ -> limit_ram_index(State1) + 0 -> push_betas_to_deltas(State2); + _ -> limit_ram_index(State2) end. limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> Permitted = permitted_ram_index_count(State), if Permitted =/= infinity andalso RamIndexCount > Permitted -> - Reduction = lists:min([RamIndexCount - Permitted, - ?RAM_INDEX_BATCH_SIZE]), - case Reduction < ?RAM_INDEX_BATCH_SIZE of + Reduction = lists:min([RamIndexCount - Permitted, ?IO_BATCH_SIZE]), + case Reduction < ?IO_BATCH_SIZE of true -> State; false -> #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState } = State, @@ -1227,7 +1233,9 @@ maybe_deltas_to_betas(State = #vqstate { end end. -maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> +maybe_push_q1_to_betas(0, State) -> + {0, State}; +maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( fun queue:out/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, @@ -1238,26 +1246,30 @@ maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> Q1a, State1 = #vqstate { q2 = Q2 }) -> State1 #vqstate { q1 = Q1a, q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } - end, Q1, State). + end, Quota, Q1, State). -maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> +maybe_push_q4_to_betas(0, State) -> + {0, State}; +maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( fun queue:out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), q4 = Q4a } - end, Q4, State). + end, Quota, Q4, State). -maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, +maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) - when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> - State; -maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> + when Quota =:= 0 orelse + TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + {Quota, State}; +maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of - {empty, _Q} -> State; + {empty, _Q} -> + {Quota, State}; {{value, MsgStatus}, Qa} -> {MsgStatus1 = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, @@ -1268,7 +1280,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, - maybe_push_alphas_to_betas(Generator, Consumer, Qa, + maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, Consumer(MsgStatus2, Qa, State2)) end. |