diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 13:16:48 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 13:16:48 +0000 |
commit | d19f9bad5769175589a91badb6f35d62ab125018 (patch) | |
tree | 67bbae3c92d11064030528da15f6a7e3bd27f8ec | |
parent | bc197ff9f39b9fce43effa90ba641e88530da96b (diff) | |
download | rabbitmq-server-d19f9bad5769175589a91badb6f35d62ab125018.tar.gz |
provide two flavours of rabbit_amqqueue:deliver
- one with flow control and one without
This allows us to leave the various existing call sites of
rabbit_amqqueue:deliver unchanged, and only subject the channel to
flow control. It also reduces the distance between the two places that
need to know which messages should be subject to flow control by
moving the rabbit_flow invocation logic from the channel into
rabbit_amqqueue.
-rw-r--r-- | src/rabbit_amqqueue.erl | 78 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
4 files changed, 56 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 41e644f2..94a99a49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -120,6 +120,8 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -425,39 +427,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver([], #delivery{mandatory = false, immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = qpids(Qs), - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), - {routed, QPids}; +deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} - end. +deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -549,6 +521,46 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + case Flow of + flow -> [credit_flow:send(QPid) || QPid <- QPids]; + noflow -> ok + end, + delegate:invoke_no_result( + QPids, fun (QPid) -> + gen_server2:cast(QPid, {deliver, Delivery, Flow}) + end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, + _Flow) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. + qpids(Qs) -> lists:append([[QPid | SPids] || #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 44ead85e..af80905b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1045,10 +1045,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. ch_record_publisher(Sender), - credit_flow:ack(Sender), + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f6972b2f..680e486d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1346,17 +1346,11 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, - mandatory = Mandatory, - immediate = Immediate, msg_seq_no = MsgSeqNo}, QNames}, State) -> {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery), + rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), - case {Mandatory, Immediate} of - {false, false} -> [credit_flow:send(QPid) || QPid <- DeliveredQPids]; - _ -> ok - end, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 0e941018..06c5beac 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -207,9 +207,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - credit_flow:ack(Sender), + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> |