diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 104 |
1 files changed, 63 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 04457ed0..8c00c85c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,15 +26,16 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3]). +-export([basic_get/4, basic_consume/9, basic_cancel/4]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, + cancel_sync_mirrors/1]). %% internal --export([internal_declare/2, internal_delete/2, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -143,25 +144,26 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). --spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> - ok_or_errors()). --spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> +-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). +-spec(basic_consume/9 :: + (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(resume/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/2 :: - (name(), pid()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/1 :: + (name()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -174,6 +176,8 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). +-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')). +-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). -endif. @@ -258,7 +262,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName, QPid), + false -> TailFun = internal_delete(QueueName), fun () -> TailFun(), ExistingQ end end end @@ -303,6 +307,8 @@ add_default_binding(#amqqueue{name = QueueName}) -> key = RoutingKey, args = []}). +lookup([]) -> []; %% optimisation +lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> %% Normally we'd call mnesia:dirty_read/1 here, but that is quite %% expensive for reasons explained in rabbit_misc:dirty_read/1. @@ -381,14 +387,10 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> - rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). + rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, + [Key || {Key, _Fun} <- args()]). check_declare_arguments(QueueName, Args) -> - Checks = [{<<"x-expires">>, fun check_expires_arg/2}, - {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, - {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; TypeVal -> case Fun(TypeVal, Args) of @@ -399,9 +401,16 @@ check_declare_arguments(QueueName, Args) -> [Key, rabbit_misc:rs(QueueName), Error]) end - end || {Key, Fun} <- Checks], + end || {Key, Fun} <- args()], ok. +args() -> + [{<<"x-expires">>, fun check_expires_arg/2}, + {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, + {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_max_length_arg/2}]. + check_string_arg({longstr, _}, _Args) -> ok; check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. @@ -411,6 +420,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_max_length_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; @@ -523,16 +539,19 @@ notify_down_all(QPids, ChPid) -> Bads1 -> {error, Bads1} end. -limit_all(QPids, ChPid, Limiter) -> - delegate:cast(QPids, {limit, ChPid, Limiter}). +activate_limit_all(QPids, ChPid) -> + delegate:cast(QPids, {activate_limit, ChPid}). + +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). -basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate:call(QPid, {basic_get, ChPid, NoAck}). +basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> + delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -554,7 +573,7 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}). +resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). @@ -570,7 +589,7 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName, QPid) -> +internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of @@ -580,8 +599,7 @@ internal_delete(QueueName, QPid) -> fun() -> ok = T(), ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QueueName}]) + [{name, QueueName}]) end end end). @@ -616,10 +634,13 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). +sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = - qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + qlc:e(qlc:q([{QName, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), @@ -632,10 +653,9 @@ on_node_down(Node) -> fun () -> T(), lists:foreach( - fun({QName, QPid}) -> + fun(QName) -> ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QName}]) + [{name, QName}]) end, Qs) end end). @@ -693,6 +713,8 @@ deliver(Qs, Delivery, _Flow) -> R -> {routed, [QPid || {QPid, ok} <- R]} end. +qpids([]) -> {[], []}; %% optimisation +qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt qpids(Qs) -> {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids}, {MPidAcc, SPidAcc}) -> |