diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b2473f91..4a20a1bc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -60,7 +60,7 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -650,18 +650,17 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. -deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> +deliver([], #delivery{mandatory = false}, _Flow) -> %% /dev/null optimisation {routed, []}; -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. +deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> + %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver + %% will deliver the message to the queue process asynchronously, + %% and return true, which means all the QPids will always be + %% returned. It is therefore safe to use a fire-and-forget cast + %% here and return the QPids - the semantics is preserved. This + %% scales much better than the case below. QPids = qpids(Qs), case Flow of flow -> [credit_flow:send(QPid) || QPid <- QPids]; @@ -673,21 +672,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> end), {routed, QPids}; -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, - _Flow) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} +deliver(Qs, Delivery, _Flow) -> + case delegate:invoke( + qpids(Qs), fun (QPid) -> + ok = gen_server2:call(QPid, {deliver, Delivery}, + infinity) + end) of + {[], _} -> {unroutable, []}; + {R , _} -> {routed, [QPid || {QPid, ok} <- R]} end. qpids(Qs) -> lists:append([[QPid | SPids] || |