diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-11-23 13:50:37 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-23 13:50:37 +0000 |
commit | 0e455cb23ccc8045361b9769f5fbf378254b4013 (patch) | |
tree | 0ab1e31b28eab9083b9b70dcd7218c2e6f96eaf8 | |
parent | 75cec9b38c1debbeb4cf5ac37c0c4b781d620485 (diff) | |
download | rabbitmq-server-0e455cb23ccc8045361b9769f5fbf378254b4013.tar.gz |
Propagate API change
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 |
3 files changed, 17 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3c44a50..743d72ef 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1154,6 +1154,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); +handle_call({fold, Fun, Acc}, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Reply, BQS1} = BQ:fold(Fun, Acc, BQS), + reply(Reply, State #q{backing_queue_state = BQS1}); + handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> %% lookup again to get policy for init_with_existing_bq diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 329144f9..f0d0b46c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -159,6 +159,11 @@ %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), any()) -> any()), + any(), state()) -> {any(), state()}. + %% How long is my queue? -callback len(state()) -> non_neg_integer(). @@ -220,7 +225,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 39060c09..15aeea01 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -19,7 +19,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, - requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, + requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, foreach_ack/3]). @@ -321,6 +321,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +fold(Fun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:fold(Fun, Acc, BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:len(BQS). |