summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-16 11:03:38 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-16 11:03:38 +0000
commitb7aca5bbc2d9b7db8200f1939e641a9a7c258a80 (patch)
tree38e1ba99a5ef435317f4c9b69b3b5e022b1cfa61 /src/rabbit_amqqueue.erl
parent3dc1d7a21e47e708dc207f202333c9bf895d0828 (diff)
downloadrabbitmq-server-b7aca5bbc2d9b7db8200f1939e641a9a7c258a80.tar.gz
merge rabbit_router:deliver into rabbit_amqqueue:deliverbug24681
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl66
1 files changed, 50 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 96017df8..41e644f2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,17 +44,17 @@
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, routing_result/0]).
-type(name() :: rabbit_types:r('queue')).
-
+-type(qpids() :: [pid()]).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -69,7 +69,8 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
- rabbit_types:error('not_found')).
+ rabbit_types:error('not_found');
+ ([name()]) -> [rabbit_types:amqqueue()]).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
@@ -117,12 +118,13 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
+-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
@@ -134,7 +136,7 @@
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
+-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -264,6 +266,10 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup(Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_misc:dirty_read/1.
+ lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -419,14 +425,39 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver(QPid, Delivery = #delivery{immediate = true}) ->
- gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
-deliver(QPid, Delivery = #delivery{mandatory = true}) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity),
- true;
-deliver(QPid, Delivery) ->
- gen_server2:cast(QPid, {deliver, Delivery}),
- true.
+deliver([], #delivery{mandatory = false, immediate = false}) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
+ immediate = Immediate}) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -518,6 +549,9 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+qpids(Qs) -> lists:append([[QPid | SPids] ||
+ #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(