summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-23 16:35:43 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-23 16:35:43 +0100
commit5e6173aa60ae4468e0dfc8481ae1378f0af5b2eb (patch)
tree571561b82e6c0e563038e4aaf88113b78d531bce
parenta03f41b8beeca6306fc149894b9ae5680db5b0cf (diff)
downloadrabbitmq-server-5e6173aa60ae4468e0dfc8481ae1378f0af5b2eb.tar.gz
Clearer var naming in BQ:dropwhile. Fix in amqqueue_process:drop_expired_messages.
Also, corrected the dropwhile-related comments in rabbit_backing_queue. For what concerns the fix in drop_expired_messages, I was wrongly generating acktags for dead messages not to be dead-lettered.
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_variable_queue.erl12
4 files changed, 21 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a370a25e..0cf7de40 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -696,14 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ }) ->
Now = now_micros(),
- {Msgs, BQS1} =
- BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- true, BQS),
DLXFun = dead_letter_fun(expired, State),
- lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
+ ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ case DLXFun of
+ undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
+ BQS1;
+ _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(
+ fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
+ BQS1
+ end,
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -720,9 +724,7 @@ ensure_ttl_timer(State) ->
State.
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- fun(_Msg, _AckTag) ->
- ok
- end;
+ undefined;
dead_letter_fun(Reason, _State) ->
fun(Msg, AckTag) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 150c2551..28c57bb0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -120,8 +120,8 @@
%% Drop messages from the head of the queue while the supplied predicate returns
%% true. Also accepts a boolean parameter that determines whether the messages
-%% are to be acked or not. If they are, the messages and the acktags are
-%% returned.
+%% necessitate an ack or not. If they do, the function returns a list of
+%% messages with the respective acktags.
-callback dropwhile(msg_pred(), true, state())
-> {[{rabbit_types:basic_message(), ack()}], state()};
(msg_pred(), false, state())
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 3afa5b60..551fdf18 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -168,13 +168,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, AckMsgs,
+dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS),
+ {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1}),
Dropped = Len - Len1,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c418cc4d..ddbe6bcc 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -578,24 +578,24 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckMsgs, State) ->
- End = fun(S) when AckMsgs -> {[], S};
- (S) -> {undefined, S}
+dropwhile(Pred, AckRequired, State) ->
+ End = fun(S) when AckRequired -> {[], S};
+ (S) -> {undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
End(a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckMsgs} of
+ case {Pred(MsgProps), AckRequired} of
{true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
{{Msg, _, AckTag, _}, State3} =
internal_fetch(true, MsgStatus1, State2),
- {L, State4} = dropwhile(Pred, AckMsgs, State3),
+ {L, State4} = dropwhile(Pred, AckRequired, State3),
{[{Msg, AckTag} | L], State4};
{true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckMsgs, State2);
+ dropwhile(Pred, AckRequired, State2);
{false, _} ->
End(a(in_r(MsgStatus, State1)))
end