diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-03 16:29:55 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-03 16:29:55 +0000 |
commit | 5d3fe47e52f64cd8e7f0f7d0bdb854b47b438da9 (patch) | |
tree | 79b01f374acc623340fcb6ce820b5b4e5509e70b | |
parent | ebb3d7e0f5157a19e86c862c924d3ab67ff301df (diff) | |
download | rabbitmq-server-5d3fe47e52f64cd8e7f0f7d0bdb854b47b438da9.tar.gz |
decouple syncing from paging
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 31 |
2 files changed, 14 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9d240a3f..bd65bc4b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1233,9 +1233,10 @@ handle_info(timeout, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({bump_credit, Msg}, State) -> +handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> credit_flow:handle_bump_msg(Msg), - noreply(backing_queue_timeout(State)); + noreply(State#q{backing_queue_state = BQ:resume(BQS)}); handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c8cafa57..c1a18a74 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,10 +18,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/2, fetchwhile/4, - fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, - is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, msg_rates/1, + dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, + ackfold/4, fold/3, len/1, is_empty/1, depth/1, + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, + handle_pre_hibernate/1, resume/1, msg_rates/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -813,30 +813,21 @@ ram_duration(State) -> {Duration, State1}. -needs_timeout(State = #vqstate { index_state = IndexState, - target_ram_count = TargetRamCount }) -> +needs_timeout(#vqstate { index_state = IndexState }) -> case rabbit_queue_index:needs_sync(IndexState) of - confirms -> timed; - other -> idle; - false when TargetRamCount == infinity -> false; - false -> case reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end + confirms -> timed; + other -> idle; + false -> false end. timeout(State = #vqstate { index_state = IndexState }) -> - IndexState1 = rabbit_queue_index:sync(IndexState), - State1 = State #vqstate { index_state = IndexState1 }, - a(reduce_memory_use(State1)). + State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. +resume(State) -> a(reduce_memory_use(State)). + msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. |