diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-09-13 12:11:31 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-09-13 12:11:31 +0100 |
commit | aa2793cf0c6edd08e6f46fcd35c05b998940ef48 (patch) | |
tree | d081bd5e884e969a753248f3b3f57fad0a9f2929 | |
parent | bd3f124776d33abfa759f7f8ba87da8e118a0da2 (diff) | |
parent | cbf6876df093db5c71e7cb100b880e309d837401 (diff) | |
download | rabbitmq-server-aa2793cf0c6edd08e6f46fcd35c05b998940ef48.tar.gz |
Merged bug24888 into default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 72 |
1 files changed, 36 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f5a3a5f1..20ba4574 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -726,8 +726,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, {Next, BQS2}; _ -> {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, - Msgs), + DLXFun(Msgs), {Next, BQS2} end, ensure_ttl_timer(case Props of @@ -767,37 +766,17 @@ ack_if_no_dlx(_AckTags, State) -> dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> - fun(Msg, AckTag) -> - gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) - end. - -dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> - DLMsg = #basic_message{exchange_name = XName} = - make_dead_letter_msg(Reason, Msg, State), - case rabbit_exchange:lookup(XName) of - {ok, X} -> - Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), - {Queues, Cycles} = detect_dead_letter_cycles( - DLMsg, rabbit_exchange:route(X, Delivery)), - lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), - DeliveredQPids; - {error, not_found} -> - [] - end. - -dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC}) -> - QPids = dead_letter_publish(Msg, Reason, State), - State1 = State#q{queue_monitors = pmon:monitor_all( - QPids, State#q.queue_monitors), - publish_seqno = MsgSeqNo + 1}, - case QPids of - [] -> cleanup_after_confirm([AckTag], State1); - _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), - noreply(State1#q{unconfirmed = UC1}) - end. + fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end. + +dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = make_dead_letter_msg(Reason, Msg, State), + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> @@ -1244,7 +1223,12 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> true -> fun (State1) -> requeue_and_run(AckTags, State1) end; false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Fun = dead_letter_fun(rejected, State1), + Fun = + case dead_letter_fun(rejected, State1) of + undefined -> undefined; + F -> fun(M, A) -> F([{M, A}]) + end + end, BQS1 = BQ:fold(Fun, BQS, AckTags), ack_if_no_dlx( AckTags, @@ -1296,8 +1280,24 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> - dead_letter_msg(Msg, AckTag, Reason, State); +handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> + noreply(lists:foldl( + fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo, + unconfirmed = UC, + queue_monitors = QMon}) -> + QPids = dead_letter_publish(Msg, Reason, X, + State1), + UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), + QMons = pmon:monitor_all(QPids, QMon), + State1#q{queue_monitors = QMons, + publish_seqno = SeqNo + 1, + unconfirmed = UC1} + end, State, Msgs)); + {error, not_found} -> + cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) + end; handle_cast(wake_up, State) -> noreply(State). |