summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-12-05 16:28:25 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-12-05 16:28:25 +0000
commit3761e87b40caaf58962655130bb5eea3f9d20860 (patch)
treec40c2a8eedc55028f8920176cffc20bd219078bd
parenta38e4cfeb5e1657210c8badcccef54d55f48f22b (diff)
downloadrabbitmq-server-3761e87b40caaf58962655130bb5eea3f9d20860.tar.gz
send expired messages to self() one at a time
which simplifies the code a fair bit and also means we don't build up a potentially huge intermediate data structure.
-rw-r--r--src/rabbit_amqqueue_process.erl69
1 files changed, 31 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ffa2a1a..2bb476fb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -724,26 +724,24 @@ drop_expired_messages(State = #q{dlx = DLX,
backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLX of
- undefined -> BQ:dropwhile(ExpirePred, BQS);
- _ -> {Next, Msgs, BQS2} =
- BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/4,
- [], BQS),
- case Msgs of
- [] -> ok;
- _ -> (dead_letter_fun(expired))(
- lists:reverse(Msgs))
- end,
- {Next, BQS2}
- end,
+ {Props, BQS1} =
+ case DLX of
+ undefined -> BQ:dropwhile(ExpirePred, BQS);
+ _ -> DLXFun = dead_letter_fun(expired),
+ {Next, ok, BQS2} =
+ BQ:fetchwhile(
+ ExpirePred,
+ fun (Msg, _IsDelivered, AckTag, Acc) ->
+ DLXFun(Msg, AckTag),
+ Acc
+ end, ok, BQS),
+ {Next, BQS2}
+ end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
-accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
-
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
@@ -764,7 +762,9 @@ ensure_ttl_timer(_Expiry, State) ->
State.
dead_letter_fun(Reason) ->
- fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
+ fun(Msg, AckTag) ->
+ gen_server2:cast(self(), {dead_letter, Msg, AckTag, Reason})
+ end.
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
DLMsg = make_dead_letter_msg(Reason, Msg, State),
@@ -1213,8 +1213,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ BQS1 = BQ:foreach_ack(DLXFun, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1262,29 +1261,23 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
+handle_cast({dead_letter, Msg, AckTag, Reason},
+ State = #q{dlx = XName,
+ publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- {AckImmediately, State2} =
- lists:foldl(
- fun({Msg, AckTag},
- {Acks, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}}) ->
- case dead_letter_publish(Msg, Reason, X, State1) of
- [] -> {[AckTag | Acks], State1};
- QPids -> UC1 = dtree:insert(
- SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- {Acks,
- State1#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}}
- end
- end, {[], State}, Msgs),
- cleanup_after_confirm(AckImmediately, State2);
+ case dead_letter_publish(Msg, Reason, X, State) of
+ [] -> cleanup_after_confirm([AckTag], State);
+ QPids -> UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ State#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}
+ end;
{error, not_found} ->
- cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
+ cleanup_after_confirm([AckTag], State)
end;
handle_cast(start_mirroring, State = #q{backing_queue = BQ,