diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 20:47:45 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 20:47:45 +0000 |
commit | 9d115a8217adba13989efb9fb20660e6ea39241e (patch) | |
tree | cb5d9e6bef45ac654792b14f2605d83b0a2479fb /src | |
parent | e7cc227a8f508bd8e2845be6e178de46617bef6f (diff) | |
download | rabbitmq-server-9d115a8217adba13989efb9fb20660e6ea39241e.tar.gz |
refactor
it's convenient for callers to have maybe_send_drained thread through
the state
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 |
1 files changed, 6 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e30a9839..6de1d0a4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -424,7 +424,8 @@ maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of true -> [send_drained(C) || C <- all_ch_record()]; false -> ok - end. + end, + State. send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> case rabbit_limiter:drained(Limiter) of @@ -598,15 +599,13 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, WasEmpty = BQ:is_empty(BQS), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), - maybe_send_drained(WasEmpty, State1), - run_message_queue(State1). + run_message_queue(maybe_send_drained(WasEmpty, State1)). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), - maybe_send_drained(Result =:= empty, State1), - {Result, State1}. + {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -1211,8 +1210,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), State1 = State#q{backing_queue_state = BQS1}, - maybe_send_drained(Count =:= 0, State1), - reply({ok, Count}, State1); + reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1402,8 +1400,7 @@ handle_info(maybe_expire, State) -> handle_info(drop_expired, State) -> WasEmpty = is_empty(State), State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), - maybe_send_drained(WasEmpty, State1), - noreply(State1); + noreply(maybe_send_drained(WasEmpty, State1)); handle_info(emit_stats, State) -> emit_stats(State), |