summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-16 13:16:48 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-16 13:16:48 +0000
commitd19f9bad5769175589a91badb6f35d62ab125018 (patch)
tree67bbae3c92d11064030528da15f6a7e3bd27f8ec
parentbc197ff9f39b9fce43effa90ba641e88530da96b (diff)
downloadrabbitmq-server-d19f9bad5769175589a91badb6f35d62ab125018.tar.gz
provide two flavours of rabbit_amqqueue:deliver
- one with flow control and one without This allows us to leave the various existing call sites of rabbit_amqqueue:deliver unchanged, and only subject the channel to flow control. It also reduces the distance between the two places that need to know which messages should be subject to flow control by moving the rabbit_flow invocation logic from the channel into rabbit_amqqueue.
-rw-r--r--src/rabbit_amqqueue.erl78
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
4 files changed, 56 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 41e644f2..94a99a49 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -120,6 +120,8 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -425,39 +427,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver([], #delivery{mandatory = false, immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = qpids(Qs),
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
- {routed, QPids};
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
- end.
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -549,6 +521,46 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 44ead85e..af80905b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1045,10 +1045,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
ch_record_publisher(Sender),
- credit_flow:ack(Sender),
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f6972b2f..680e486d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1346,17 +1346,11 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
- mandatory = Mandatory,
- immediate = Immediate,
msg_seq_no = MsgSeqNo},
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
- case {Mandatory, Immediate} of
- {false, false} -> [credit_flow:send(QPid) || QPid <- DeliveredQPids];
- _ -> ok
- end,
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 0e941018..06c5beac 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -207,9 +207,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- credit_flow:ack(Sender),
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->