summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl42
1 files changed, 17 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b2473f91..4a20a1bc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -60,7 +60,7 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(routing_result() :: 'routed' | 'unroutable').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -650,18 +650,17 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
-deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+deliver([], #delivery{mandatory = false}, _Flow) ->
%% /dev/null optimisation
{routed, []};
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
+deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
+ %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
+ %% will deliver the message to the queue process asynchronously,
+ %% and return true, which means all the QPids will always be
+ %% returned. It is therefore safe to use a fire-and-forget cast
+ %% here and return the QPids - the semantics is preserved. This
+ %% scales much better than the case below.
QPids = qpids(Qs),
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
@@ -673,21 +672,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
end),
{routed, QPids};
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
- _Flow) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
+deliver(Qs, Delivery, _Flow) ->
+ case delegate:invoke(
+ qpids(Qs), fun (QPid) ->
+ ok = gen_server2:call(QPid, {deliver, Delivery},
+ infinity)
+ end) of
+ {[], _} -> {unroutable, []};
+ {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
end.
qpids(Qs) -> lists:append([[QPid | SPids] ||