summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-23 13:50:37 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-23 13:50:37 +0000
commit0e455cb23ccc8045361b9769f5fbf378254b4013 (patch)
tree0ab1e31b28eab9083b9b70dcd7218c2e6f96eaf8
parent75cec9b38c1debbeb4cf5ac37c0c4b781d620485 (diff)
downloadrabbitmq-server-0e455cb23ccc8045361b9769f5fbf378254b4013.tar.gz
Propagate API change
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl7
3 files changed, 17 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3c44a50..743d72ef 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1154,6 +1154,11 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call({fold, Fun, Acc}, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Reply, BQS1} = BQ:fold(Fun, Acc, BQS),
+ reply(Reply, State #q{backing_queue_state = BQS1});
+
handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 329144f9..f0d0b46c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -159,6 +159,11 @@
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(), any()) -> any()),
+ any(), state()) -> {any(), state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -220,7 +225,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 39060c09..15aeea01 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -19,7 +19,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
@@ -321,6 +321,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).