summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-01-21 14:28:05 +0000
committerTim Watson <tim@rabbitmq.com>2013-01-21 14:28:05 +0000
commit3a79219d6ea1acf176a92110c10f5d223baa9fbd (patch)
treec81a0831e97d2e7ae03b0a481fd1093f92b897a2
parent154510f6dc1ae3146b21b02d2c07a7a9d3ff8183 (diff)
parenta4891ce1d6006c6f36c8408d96028b7b3ee35be9 (diff)
downloadrabbitmq-server-3a79219d6ea1acf176a92110c10f5d223baa9fbd.tar.gz
merge bug25409 into default
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl4
4 files changed, 27 insertions, 3 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9a3c67f9..2b43c8ba 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -75,6 +75,10 @@
%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean(), pid(),
@@ -226,7 +230,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b5f72cad..c704804e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/4,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -198,6 +198,8 @@ purge(State = #state { gm = GM,
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1 }}.
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
+
publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 13454d31..7bd8d541 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2323,6 +2323,7 @@ test_variable_queue() ->
fun test_dropwhile_varying_ram_duration/1,
fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
passed.
@@ -2418,6 +2419,21 @@ test_variable_queue_requeue(VQ0) ->
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
VQ3.
+test_variable_queue_purge(VQ0) ->
+ LenDepth = fun (VQ) ->
+ {rabbit_variable_queue:len(VQ),
+ rabbit_variable_queue:depth(VQ)}
+ end,
+ VQ1 = variable_queue_publish(false, 10, VQ0),
+ {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
+ {4, VQ3} = rabbit_variable_queue:purge(VQ2),
+ {0, 6} = LenDepth(VQ3),
+ {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
+ {2, 6} = LenDepth(VQ4),
+ VQ5 = rabbit_variable_queue:purge_acks(VQ4),
+ {2, 2} = LenDepth(VQ5),
+ VQ5.
+
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
Len = 1024,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a7045ea..7e09e5e3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
@@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,