diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-11-22 15:47:04 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-22 15:47:04 +0000 |
commit | 1c86935bb0d27011e20e68169cc9fd5064f11921 (patch) | |
tree | 084c3aa0764f9cde99fd8686e0e7a434c35958ec | |
parent | 6e797a7e99b045a84e737aee20b608d7f6da3d70 (diff) | |
parent | d7bfdd0573b7648bf521ab8300be42782f873c56 (diff) | |
download | rabbitmq-server-1c86935bb0d27011e20e68169cc9fd5064f11921.tar.gz |
Merged default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
5 files changed, 30 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dc258fa6..b3c44a50 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1226,7 +1226,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end, + BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); @@ -1278,18 +1278,24 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - noreply(lists:foldl( - fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMon}) -> - QPids = dead_letter_publish(Msg, Reason, X, - State1), - UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), - QMons = pmon:monitor_all(QPids, QMon), - State1#q{queue_monitors = QMons, - publish_seqno = SeqNo + 1, - unconfirmed = UC1} - end, State, Msgs)); + {AckImmediately, State2} = + lists:foldl( + fun({Msg, AckTag}, + {Acks, State1 = #q{publish_seqno = SeqNo, + unconfirmed = UC, + queue_monitors = QMons}}) -> + case dead_letter_publish(Msg, Reason, X, State1) of + [] -> {[AckTag | Acks], State1}; + QPids -> UC1 = dtree:insert( + SeqNo, QPids, AckTag, UC), + QMons1 = pmon:monitor_all(QPids, QMons), + {Acks, + State1#q{publish_seqno = SeqNo + 1, + unconfirmed = UC1, + queue_monitors = QMons1}} + end + end, {[], State}, Msgs), + cleanup_after_confirm(AckImmediately, State2); {error, not_found} -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 00de3e17..329144f9 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -153,7 +153,7 @@ %% Acktags supplied are for messages which should be processed. The %% provided callback function is called with each message. --callback fold(msg_fun(), state(), [ack()]) -> state(). +-callback foreach_ack(msg_fun(), state(), [ack()]) -> state(). %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. @@ -220,7 +220,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 961636b1..39060c09 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, fold/3]). + status/1, invoke/3, is_duplicate/2, foreach_ack/3]). -export([start/1, stop/0]). @@ -310,9 +310,9 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -fold(MsgFun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }, AckTags) -> - State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. +foreach_ack(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 912bd3b6..176374ce 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2534,7 +2534,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_variable_queue_fold_msg_on_disk(VQ0) -> VQ1 = variable_queue_publish(true, 1, VQ0), {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), - VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags), + VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end, + VQ2, AckTags), VQ3. test_queue_recover() -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e9d3c6e3..7636d5ea 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -21,7 +21,7 @@ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). -export([start/1, stop/0]). @@ -647,9 +647,9 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -fold(undefined, State, _AckTags) -> +foreach_ack(undefined, State, _AckTags) -> State; -fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> +foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> a(lists:foldl(fun(SeqId, State1) -> {MsgStatus, State2} = read_msg(gb_trees:get(SeqId, PA), false, State1), |