diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965..9820567c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -27,10 +27,12 @@ -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). + %% internal -export([internal_declare/2, internal_delete/1, - run_backing_queue/2, run_backing_queue_async/2, + run_backing_queue/3, run_backing_queue_async/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -141,10 +143,12 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(run_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue_async/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -192,12 +196,13 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, + Q = start_queue_process(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, exclusive_owner = Owner, - pid = none}), + pid = none, + mirror_pids = []}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -439,11 +444,11 @@ internal_delete(QueueName) -> end end). -run_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Fun}, infinity). +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). -run_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {run_backing_queue, Fun}). +run_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). @@ -466,7 +471,8 @@ drop_expired(QPid) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + mirror_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])) end, @@ -483,11 +489,12 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid, + mirror_pids = []}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> |