summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-20 20:47:45 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-20 20:47:45 +0000
commit9d115a8217adba13989efb9fb20660e6ea39241e (patch)
treecb5d9e6bef45ac654792b14f2605d83b0a2479fb
parente7cc227a8f508bd8e2845be6e178de46617bef6f (diff)
downloadrabbitmq-server-9d115a8217adba13989efb9fb20660e6ea39241e.tar.gz
refactor
it's convenient for callers to have maybe_send_drained thread through the state
-rw-r--r--src/rabbit_amqqueue_process.erl15
1 files changed, 6 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e30a9839..6de1d0a4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -424,7 +424,8 @@ maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
true -> [send_drained(C) || C <- all_ch_record()];
false -> ok
- end.
+ end,
+ State.
send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
@@ -598,15 +599,13 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ,
WasEmpty = BQ:is_empty(BQS),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
- maybe_send_drained(WasEmpty, State1),
- run_message_queue(State1).
+ run_message_queue(maybe_send_drained(WasEmpty, State1)).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
- maybe_send_drained(Result =:= empty, State1),
- {Result, State1}.
+ {Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -1211,8 +1210,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
State1 = State#q{backing_queue_state = BQS1},
- maybe_send_drained(Count =:= 0, State1),
- reply({ok, Count}, State1);
+ reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1402,8 +1400,7 @@ handle_info(maybe_expire, State) ->
handle_info(drop_expired, State) ->
WasEmpty = is_empty(State),
State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
- maybe_send_drained(WasEmpty, State1),
- noreply(State1);
+ noreply(maybe_send_drained(WasEmpty, State1));
handle_info(emit_stats, State) ->
emit_stats(State),