summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-01 17:02:26 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-01 17:02:26 +0100
commitd70eb76abf5266b503996f26c25c4117fc2b0344 (patch)
tree8c14cba88f07ca292e297723bac20557764e9cad
parent3d87dab43b9833cc654715239098fdd0348619a6 (diff)
downloadrabbitmq-server-bug25199.tar.gz
get rid of all but one use of run_backing_queue and inline the latterbug25199
-rw-r--r--src/rabbit_amqqueue_process.erl30
1 files changed, 14 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 10ac5bea..33fd36db 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -578,14 +578,13 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
State2#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
- BQS1
- end, State).
-
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ run_message_queue(State#q{backing_queue_state = BQS1}).
+
+fetch(AckRequired, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
@@ -679,12 +678,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-backing_queue_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-
-run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
+backing_queue_timeout(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
case lookup_ch(ChPid) of
@@ -1173,8 +1169,10 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
-handle_cast({run_backing_queue, Mod, Fun}, State) ->
- noreply(run_backing_queue(Mod, Fun, State));
+handle_cast({run_backing_queue, Mod, Fun},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ noreply(run_message_queue(
+ State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State = #q{senders = Senders}) ->