diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-05-07 17:47:07 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-05-07 17:47:07 +0100 |
commit | 3f8d2741e08558b8bc1030e957487cfd6b2d9c3a (patch) | |
tree | 087e3b5b7bb2357a25b3dbdbfdf1ffd5373894fd | |
parent | 5bb9d6aa334df375145d1fab37c586f996c3e06d (diff) | |
download | rabbitmq-server-bug25460.tar.gz |
flow control for DLX, a first draftbug25460
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 76 |
1 files changed, 68 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 066392f8..68b6be07 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -816,22 +816,82 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, backing_queue_state = BQS, backing_queue = BQ}) -> QName = qname(State), - {Res, Acks1, BQS1} = - Fun(fun (Msg, AckTag, Acks) -> - dead_letter_publish(Msg, Reason, X, RK, QName), - [AckTag | Acks] - end, [], BQS), + {Res, {Acks1, {Ref1, Proxy1, _DeadQPids1}}, BQS1} = + Fun(fun (Msg, AckTag, {Acks, Acc}) -> + {[AckTag | Acks], + dead_letter_publish(Msg, Reason, X, RK, QName, Acc)} + end, {[], {undefined, undefined, ordsets:new()}}, BQS), {_Guids, BQS2} = BQ:ack(Acks1, BQS1), + shutdown_proxy(Ref1, Proxy1), {Res, State#q{backing_queue_state = BQS2}}. -dead_letter_publish(Msg, Reason, X, RK, QName) -> +dead_letter_publish(Msg, Reason, X, RK, QName, {Ref, Proxy, DeadQPids}) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, DLMsg, undefined), {Queues, Cycles} = detect_dead_letter_cycles( Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), - ok. + Qs = filter_dead_qpids(rabbit_amqqueue:lookup(Queues), DeadQPids), + {Self, Qs1} = lists:splitwith(fun (#amqqueue{name = QN}) -> QN =:= QName + end, Qs), + DeadQPids1 = dead_letter_publish1(Qs1, Delivery, Ref, Proxy, DeadQPids), + rabbit_amqqueue:deliver(Self, Delivery), + DeadQPids1. + +dead_letter_publish1([], _Delivery, Ref, Proxy, DeadQPids) -> + {Ref, Proxy, DeadQPids}; +dead_letter_publish1(Qs, Delivery, Ref0, Proxy0, DeadQPids) -> + {Ref, Proxy} = launch_proxy(Ref0, Proxy0), + Proxy ! {monitor, Ref, [QPid || #amqqueue{pid = QPid} <- Qs]}, + {Ref, Proxy, dead_letter_publish2(Qs, Delivery, Ref, DeadQPids)}. + +dead_letter_publish2(Qs, Delivery, Ref, DeadQPids) -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + dead_letter_publish2(Qs, Delivery, Ref, DeadQPids); + {queue_down_received, Ref, QPid} -> + credit_flow:peer_down(QPid), + DeadQPids1 = ordsets:add_element(QPid, DeadQPids), + LiveQs = filter_dead_qpids(Qs, DeadQPids1), + dead_letter_publish2(LiveQs, Delivery, Ref, DeadQPids1) + end; + false -> rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeadQPids + end. + +launch_proxy(undefined, undefined) -> + {Ref, Self} = {make_ref(), self()}, + Proxy = spawn_link(fun () -> queue_down_proxy(Ref, pmon:new(), Self) end), + {Ref, Proxy}; +launch_proxy(Ref, Proxy) -> + {Ref, Proxy}. + +shutdown_proxy(undefined, undefined) -> ok; +shutdown_proxy(Ref, Proxy) -> Proxy ! {stop, Ref}, + ok. + +%% Handle destination queue 'DOWN' notifications on behalf of the source +%% queue to distinguish from channel 'DOWN' notifications +queue_down_proxy(Ref, QMons, Recipient) -> + receive + {monitor, Ref, Items} -> + QMons1 = pmon:monitor_all(Items, QMons), + queue_down_proxy(Ref, QMons1, Recipient); + {'DOWN', _, process, QPid, _} -> + Recipient ! {queue_down_received, Ref, QPid}, + queue_down_proxy(Ref, QMons, Recipient); + {stop, Ref} -> + unlink(Recipient), + lists:foldl(fun (Item, QMons1) -> + pmon:demonitor(Item, QMons1) + end, QMons, pmon:monitored(QMons)) + end. + +filter_dead_qpids(Qs, DeadQPids) -> + [Q || Q = #amqqueue{pid = QPid} <- Qs, + not ordsets:is_element(QPid, DeadQPids)]. stop(State) -> stop(noreply, State). |