summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-14 12:37:46 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-14 12:37:46 +0000
commit4af08b87800ebb8156e5b95e31e0de9bff0a27bc (patch)
treed8cd9ab7dbe5e89e1f8c6caecb5775570b6aca4d /src/rabbit_amqqueue_process.erl
parentd8c97cf013c19cb19ba0d3235b4b030f4ed2690b (diff)
parent6fc0feb3e036f45a226e2e323cb018758a5e8db7 (diff)
downloadrabbitmq-server-4af08b87800ebb8156e5b95e31e0de9bff0a27bc.tar.gz
Merging default into bug23554
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl17
1 files changed, 7 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a7468936..6278d5a1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -258,7 +258,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
+ [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -549,12 +549,12 @@ deliver_or_enqueue(Delivery, State) ->
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) ->
{_Guids, BQS1} =
BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
- BQS1
+ {[], BQS1}
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -651,12 +651,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {BQS2, State1} =
- case Fun(BQS) of
- {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)};
- BQS1 -> {BQS1, State}
- end,
- run_message_queue(State1#q{backing_queue_state = BQS2}).
+ {Guids, BQS1} = Fun(BQS),
+ run_message_queue(
+ confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -1141,7 +1138,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};