summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-05-07 17:47:07 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-05-07 17:47:07 +0100
commit3f8d2741e08558b8bc1030e957487cfd6b2d9c3a (patch)
tree087e3b5b7bb2357a25b3dbdbfdf1ffd5373894fd
parent5bb9d6aa334df375145d1fab37c586f996c3e06d (diff)
downloadrabbitmq-server-bug25460.tar.gz
flow control for DLX, a first draftbug25460
-rw-r--r--src/rabbit_amqqueue_process.erl76
1 files changed, 68 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 066392f8..68b6be07 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -816,22 +816,82 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
backing_queue_state = BQS,
backing_queue = BQ}) ->
QName = qname(State),
- {Res, Acks1, BQS1} =
- Fun(fun (Msg, AckTag, Acks) ->
- dead_letter_publish(Msg, Reason, X, RK, QName),
- [AckTag | Acks]
- end, [], BQS),
+ {Res, {Acks1, {Ref1, Proxy1, _DeadQPids1}}, BQS1} =
+ Fun(fun (Msg, AckTag, {Acks, Acc}) ->
+ {[AckTag | Acks],
+ dead_letter_publish(Msg, Reason, X, RK, QName, Acc)}
+ end, {[], {undefined, undefined, ordsets:new()}}, BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
+ shutdown_proxy(Ref1, Proxy1),
{Res, State#q{backing_queue_state = BQS2}}.
-dead_letter_publish(Msg, Reason, X, RK, QName) ->
+dead_letter_publish(Msg, Reason, X, RK, QName, {Ref, Proxy, DeadQPids}) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
{Queues, Cycles} = detect_dead_letter_cycles(
Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
- ok.
+ Qs = filter_dead_qpids(rabbit_amqqueue:lookup(Queues), DeadQPids),
+ {Self, Qs1} = lists:splitwith(fun (#amqqueue{name = QN}) -> QN =:= QName
+ end, Qs),
+ DeadQPids1 = dead_letter_publish1(Qs1, Delivery, Ref, Proxy, DeadQPids),
+ rabbit_amqqueue:deliver(Self, Delivery),
+ DeadQPids1.
+
+dead_letter_publish1([], _Delivery, Ref, Proxy, DeadQPids) ->
+ {Ref, Proxy, DeadQPids};
+dead_letter_publish1(Qs, Delivery, Ref0, Proxy0, DeadQPids) ->
+ {Ref, Proxy} = launch_proxy(Ref0, Proxy0),
+ Proxy ! {monitor, Ref, [QPid || #amqqueue{pid = QPid} <- Qs]},
+ {Ref, Proxy, dead_letter_publish2(Qs, Delivery, Ref, DeadQPids)}.
+
+dead_letter_publish2(Qs, Delivery, Ref, DeadQPids) ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ dead_letter_publish2(Qs, Delivery, Ref, DeadQPids);
+ {queue_down_received, Ref, QPid} ->
+ credit_flow:peer_down(QPid),
+ DeadQPids1 = ordsets:add_element(QPid, DeadQPids),
+ LiveQs = filter_dead_qpids(Qs, DeadQPids1),
+ dead_letter_publish2(LiveQs, Delivery, Ref, DeadQPids1)
+ end;
+ false -> rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeadQPids
+ end.
+
+launch_proxy(undefined, undefined) ->
+ {Ref, Self} = {make_ref(), self()},
+ Proxy = spawn_link(fun () -> queue_down_proxy(Ref, pmon:new(), Self) end),
+ {Ref, Proxy};
+launch_proxy(Ref, Proxy) ->
+ {Ref, Proxy}.
+
+shutdown_proxy(undefined, undefined) -> ok;
+shutdown_proxy(Ref, Proxy) -> Proxy ! {stop, Ref},
+ ok.
+
+%% Handle destination queue 'DOWN' notifications on behalf of the source
+%% queue to distinguish from channel 'DOWN' notifications
+queue_down_proxy(Ref, QMons, Recipient) ->
+ receive
+ {monitor, Ref, Items} ->
+ QMons1 = pmon:monitor_all(Items, QMons),
+ queue_down_proxy(Ref, QMons1, Recipient);
+ {'DOWN', _, process, QPid, _} ->
+ Recipient ! {queue_down_received, Ref, QPid},
+ queue_down_proxy(Ref, QMons, Recipient);
+ {stop, Ref} ->
+ unlink(Recipient),
+ lists:foldl(fun (Item, QMons1) ->
+ pmon:demonitor(Item, QMons1)
+ end, QMons, pmon:monitored(QMons))
+ end.
+
+filter_dead_qpids(Qs, DeadQPids) ->
+ [Q || Q = #amqqueue{pid = QPid} <- Qs,
+ not ordsets:is_element(QPid, DeadQPids)].
stop(State) -> stop(noreply, State).