summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl45
1 files changed, 26 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965..9820567c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -27,10 +27,12 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
+
%% internal
-export([internal_declare/2, internal_delete/1,
- run_backing_queue/2, run_backing_queue_async/2,
+ run_backing_queue/3, run_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
emit_stats/1]).
@@ -141,10 +143,12 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(run_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(run_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue_async/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -192,12 +196,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ mirror_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -439,11 +444,11 @@ internal_delete(QueueName) ->
end
end).
-run_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {run_backing_queue, Fun}, infinity).
+run_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
-run_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {run_backing_queue, Fun}).
+run_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
@@ -466,7 +471,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ mirror_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end,
@@ -483,11 +489,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ mirror_pids = []}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->