summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-22 15:47:04 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-22 15:47:04 +0000
commit1c86935bb0d27011e20e68169cc9fd5064f11921 (patch)
tree084c3aa0764f9cde99fd8686e0e7a434c35958ec
parent6e797a7e99b045a84e737aee20b608d7f6da3d70 (diff)
parentd7bfdd0573b7648bf521ab8300be42782f873c56 (diff)
downloadrabbitmq-server-1c86935bb0d27011e20e68169cc9fd5064f11921.tar.gz
Merged default
-rw-r--r--src/rabbit_amqqueue_process.erl32
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl6
5 files changed, 30 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dc258fa6..b3c44a50 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1226,7 +1226,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
+ BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1278,18 +1278,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- noreply(lists:foldl(
- fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMon}) ->
- QPids = dead_letter_publish(Msg, Reason, X,
- State1),
- UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
- QMons = pmon:monitor_all(QPids, QMon),
- State1#q{queue_monitors = QMons,
- publish_seqno = SeqNo + 1,
- unconfirmed = UC1}
- end, State, Msgs));
+ {AckImmediately, State2} =
+ lists:foldl(
+ fun({Msg, AckTag},
+ {Acks, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}}) ->
+ case dead_letter_publish(Msg, Reason, X, State1) of
+ [] -> {[AckTag | Acks], State1};
+ QPids -> UC1 = dtree:insert(
+ SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ {Acks,
+ State1#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}}
+ end
+ end, {[], State}, Msgs),
+ cleanup_after_confirm(AckImmediately, State2);
{error, not_found} ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 00de3e17..329144f9 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -153,7 +153,7 @@
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
+-callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
@@ -220,7 +220,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 961636b1..39060c09 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -310,9 +310,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+foreach_ack(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 912bd3b6..176374ce 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2534,7 +2534,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags),
+ VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
+ VQ2, AckTags),
VQ3.
test_queue_recover() ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e9d3c6e3..7636d5ea 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -21,7 +21,7 @@
dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -647,9 +647,9 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
+foreach_ack(undefined, State, _AckTags) ->
State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
+foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
a(lists:foldl(fun(SeqId, State1) ->
{MsgStatus, State2} =
read_msg(gb_trees:get(SeqId, PA), false, State1),