summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-03-19 15:43:56 +0000
committerMatthias Radestock <matthias@lshift.net>2009-03-19 15:43:56 +0000
commit59b1fe0e022d010df8f3213bc734a59ef963fd6d (patch)
tree8ba8cd1b4f7785c3b6ca639a2883cd2c2356a015 /src/rabbit_amqqueue.erl
parent421f044f6cb6ca5551c143cef877ad7f4f9ceb81 (diff)
downloadrabbitmq-server-59b1fe0e022d010df8f3213bc734a59ef963fd6d.tar.gz
recover exchanges/bindings/queues in per-item transactionsbug20500
Because recovering them in large, single transactions is incredibly slow, with complexity that is far worse than linear in the number of entries we recover, presumably due to the way mnesia represents transaction-local storage.
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl39
1 files changed, 26 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..69c97dfe 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -122,19 +122,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ durable_queues, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(durable_queues),
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,