summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-12 20:54:33 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-12 20:54:33 +0000
commitc1fc51259dd4f61a29836858e3c07eed9c9d2ba7 (patch)
treeae05e44dedc9bcaf2865a9f0ac8c44252f3ff9bf /src/rabbit_amqqueue.erl
parentcd012caed4385b393ad612aabc12107419e13e15 (diff)
parent61bfc7972b99bf2f449579191e800da69d177910 (diff)
downloadrabbitmq-server-c1fc51259dd4f61a29836858e3c07eed9c9d2ba7.tar.gz
Merging default into bug23554
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl46
1 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6e5aae27..4ef9750c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,8 +18,8 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/2,
- maybe_run_queue_via_backing_queue_async/2,
+ maybe_run_queue_via_backing_queue/3,
+ maybe_run_queue_via_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
@@ -33,6 +33,7 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -140,10 +141,10 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -191,12 +192,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ mirror_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -450,11 +452,13 @@ internal_delete(QueueName) ->
end
end).
-maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
-maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+maybe_run_queue_via_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun},
+ infinity).
+
+maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
@@ -477,7 +481,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ mirror_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end,
@@ -494,11 +499,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ mirror_pids = []}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->