summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-19 17:34:28 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-19 17:34:28 +0100
commitf1bf76e40adb74134c113abaddf424ec895ee864 (patch)
tree851e61d1ebe4eb6fd1f566b5c30279cfb905ebeb
parent7dc0a3e863acb9f01319de9dddaf7b9d6499523e (diff)
downloadrabbitmq-server-f1bf76e40adb74134c113abaddf424ec895ee864.tar.gz
Changed BQ:dropwhile/3 to take a boolean arg to decide wether to ack or not.
I'm most likely going to change this to two different functions, the only thing that is holding me back is that I'll end up writing one function and then two wrappers around it in both instances (variable_queue and queue_master).
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl34
6 files changed, 44 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2063e557..a370a25e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -698,10 +698,12 @@ drop_expired_messages(State = #q{ttl = undefined}) ->
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = now_micros(),
- BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- dead_letter_fun(expired, State),
- BQS),
+ {Msgs, BQS1} =
+ BQ:dropwhile(
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ true, BQS),
+ DLXFun = dead_letter_fun(expired, State),
+ lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -718,7 +720,9 @@ ensure_ttl_timer(State) ->
State.
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- undefined;
+ fun(_Msg, _AckTag) ->
+ ok
+ end;
dead_letter_fun(Reason, _State) ->
fun(Msg, AckTag) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6cc1c3fd..f069575f 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,6 +35,7 @@
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
'undefined').
+-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
@@ -120,9 +121,10 @@
%% Drop messages from the head of the queue while the supplied
%% predicate returns true. A callback function is supplied allowing
%% callers access to messages that are about to be dropped.
--callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
- state())
- -> state().
+-callback dropwhile(msg_pred(), true, state())
+ -> {[{rabbit_types:basic_message(), ack()}], state()};
+ (msg_pred(), false, state())
+ -> {undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 286b69e4..78ababe8 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, true, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e6ef5c57..3afa5b60 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, MsgFun,
+dropwhile(Pred, AckMsgs,
State = #state{gm = GM,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
+ {Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 }.
+ {Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c74b8d5f..3e85e3d5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2388,10 +2388,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- VQ2 = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, undefined, VQ1),
+ {undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2408,11 +2408,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, undefined, VQ2),
+ {undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5).
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c3462929..c418cc4d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,13 +16,12 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0, fold/3]).
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
+ timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -579,23 +578,26 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, State) ->
+dropwhile(Pred, AckMsgs, State) ->
+ End = fun(S) when AckMsgs -> {[], S};
+ (S) -> {undefined, S}
+ end,
case queue_out(State) of
{empty, State1} ->
- a(State1);
+ End(a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), MsgFun} of
- {true, undefined} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, MsgFun, State2);
- {true, _} ->
+ case {Pred(MsgProps), AckMsgs} of
+ {true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
{{Msg, _, AckTag, _}, State3} =
internal_fetch(true, MsgStatus1, State2),
- ok = MsgFun(Msg, AckTag),
- dropwhile(Pred, MsgFun, State3);
+ {L, State4} = dropwhile(Pred, AckMsgs, State3),
+ {[{Msg, AckTag} | L], State4};
+ {true, false} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, AckMsgs, State2);
{false, _} ->
- a(in_r(MsgStatus, State1))
+ End(a(in_r(MsgStatus, State1)))
end
end.