diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 11:03:38 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-16 11:03:38 +0000 |
commit | b7aca5bbc2d9b7db8200f1939e641a9a7c258a80 (patch) | |
tree | 38e1ba99a5ef435317f4c9b69b3b5e022b1cfa61 /src/rabbit_amqqueue.erl | |
parent | 3dc1d7a21e47e708dc207f202333c9bf895d0828 (diff) | |
download | rabbitmq-server-b7aca5bbc2d9b7db8200f1939e641a9a7c258a80.tar.gz |
merge rabbit_router:deliver into rabbit_amqqueue:deliverbug24681
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 66 |
1 files changed, 50 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 96017df8..41e644f2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -44,17 +44,17 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, routing_result/0]). -type(name() :: rabbit_types:r('queue')). - +-type(qpids() :: [pid()]). -type(qlen() :: rabbit_types:ok(non_neg_integer())). -type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())). -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -69,7 +69,8 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | - rabbit_types:error('not_found')). + rabbit_types:error('not_found'); + ([name()]) -> [rabbit_types:amqqueue()]). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). @@ -117,12 +118,13 @@ rabbit_types:error('in_use') | rabbit_types:error('not_empty')). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). +-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). --spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> +-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). @@ -134,7 +136,7 @@ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). +-spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -264,6 +266,10 @@ add_default_binding(#amqqueue{name = QueueName}) -> key = RoutingKey, args = []}). +lookup(Names) when is_list(Names) -> + %% Normally we'd call mnesia:dirty_read/1 here, but that is quite + %% expensive for reasons explained in rabbit_misc:dirty_read/1. + lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -419,14 +425,39 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver(QPid, Delivery = #delivery{immediate = true}) -> - gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); -deliver(QPid, Delivery = #delivery{mandatory = true}) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity), - true; -deliver(QPid, Delivery) -> - gen_server2:cast(QPid, {deliver, Delivery}), - true. +deliver([], #delivery{mandatory = false, immediate = false}) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + delegate:invoke_no_result( + QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -518,6 +549,9 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +qpids(Qs) -> lists:append([[QPid | SPids] || + #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). + safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( |