summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-09-13 12:11:31 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-09-13 12:11:31 +0100
commitaa2793cf0c6edd08e6f46fcd35c05b998940ef48 (patch)
treed081bd5e884e969a753248f3b3f57fad0a9f2929
parentbd3f124776d33abfa759f7f8ba87da8e118a0da2 (diff)
parentcbf6876df093db5c71e7cb100b880e309d837401 (diff)
downloadrabbitmq-server-aa2793cf0c6edd08e6f46fcd35c05b998940ef48.tar.gz
Merged bug24888 into default
-rw-r--r--src/rabbit_amqqueue_process.erl72
1 files changed, 36 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f5a3a5f1..20ba4574 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -726,8 +726,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
{Next, BQS2};
_ ->
{Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
- Msgs),
+ DLXFun(Msgs),
{Next, BQS2}
end,
ensure_ttl_timer(case Props of
@@ -767,37 +766,17 @@ ack_if_no_dlx(_AckTags, State) ->
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
- fun(Msg, AckTag) ->
- gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
- end.
-
-dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = #basic_message{exchange_name = XName} =
- make_dead_letter_msg(Reason, Msg, State),
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
- {Queues, Cycles} = detect_dead_letter_cycles(
- DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
- DeliveredQPids;
- {error, not_found} ->
- []
- end.
-
-dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC}) ->
- QPids = dead_letter_publish(Msg, Reason, State),
- State1 = State#q{queue_monitors = pmon:monitor_all(
- QPids, State#q.queue_monitors),
- publish_seqno = MsgSeqNo + 1},
- case QPids of
- [] -> cleanup_after_confirm([AckTag], State1);
- _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State1#q{unconfirmed = UC1})
- end.
+ fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
+
+dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = make_dead_letter_msg(Reason, Msg, State),
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
@@ -1244,7 +1223,12 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- Fun = dead_letter_fun(rejected, State1),
+ Fun =
+ case dead_letter_fun(rejected, State1) of
+ undefined -> undefined;
+ F -> fun(M, A) -> F([{M, A}])
+ end
+ end,
BQS1 = BQ:fold(Fun, BQS, AckTags),
ack_if_no_dlx(
AckTags,
@@ -1296,8 +1280,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- dead_letter_msg(Msg, AckTag, Reason, State);
+handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ noreply(lists:foldl(
+ fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMon}) ->
+ QPids = dead_letter_publish(Msg, Reason, X,
+ State1),
+ UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
+ QMons = pmon:monitor_all(QPids, QMon),
+ State1#q{queue_monitors = QMons,
+ publish_seqno = SeqNo + 1,
+ unconfirmed = UC1}
+ end, State, Msgs));
+ {error, not_found} ->
+ cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
+ end;
handle_cast(wake_up, State) ->
noreply(State).