summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2014-01-09 12:23:11 +0000
committerTim Watson <watson.timothy@gmail.com>2014-01-09 12:23:11 +0000
commitd400309403571c32c6beeb2163feccf42e4f2bf6 (patch)
treefed5d5ff9690e21acdb5a230edfd1729ea1ea925
parent2d06b5ab148709540832459b54773b950d5022ed (diff)
downloadrabbitmq-server-d400309403571c32c6beeb2163feccf42e4f2bf6.tar.gz
Better handling of file system directories during shutdown/recovery
Ensure the ?queues? directory exists before attempting to sync the recovery terms dets table. Drive qi recovery off the durable queues we?re passed, rather than relying on the existence of queue dirs, which are created lazily.
-rw-r--r--src/rabbit_queue_index.erl42
-rw-r--r--src/rabbit_recovery_terms.erl1
2 files changed, 22 insertions, 21 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 543bd45a..4a809f43 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -369,31 +369,31 @@ recover(DurableQueues) ->
DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue),
{DirName, Queue}
end || Queue <- DurableQueues ]),
+
QueuesDir = queues_dir(),
- QueueDirNames = all_queue_directory_names(QueuesDir),
- DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
{DurableQueueNames, DurableTerms} =
- lists:foldl(
- fun (QueueDirName, {DurableAcc, TermsAcc}) ->
+ dict:fold(
+ fun (QueueDirName, QueueName, {DurableAcc, TermsAcc}) ->
QueueDirPath = filename:join(QueuesDir, QueueDirName),
- case sets:is_element(QueueDirName, DurableDirectories) of
- true ->
- TermsAcc1 =
- case rabbit_recovery_terms:read(
- QueueDirPath) of
- {error, _} -> TermsAcc;
- {ok, Terms} -> [{QueueDirPath, Terms} |
- TermsAcc]
- end,
- {[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
- TermsAcc1};
- false ->
- ok = rabbit_file:recursive_delete([QueueDirPath]),
- {DurableAcc, TermsAcc}
- end
- end, {[], []}, QueueDirNames),
+ TermsAcc1 =
+ case rabbit_recovery_terms:read(QueueDirPath) of
+ {error, _} -> TermsAcc;
+ {ok, Terms} -> [{QueueDirPath, Terms} | TermsAcc]
+ end,
+ {[QueueName | DurableAcc], TermsAcc1}
+ end, {[], []}, DurableDict),
+
+ %% Any queue directory we've not been asked to recover is considered garbage
+ lists:map(
+ fun(QueueDir) ->
+ case dict:is_key(filename:basename(QueueDir),
+ DurableDict) of
+ true -> ok;
+ false -> ok = rabbit_file:recursive_delete([QueueDir])
+ end
+ end, all_queue_directory_names(QueuesDir)),
+
rabbit_recovery_terms:clear(),
- rabbit_recovery_terms:flush(),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 42d23500..3d58415d 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -138,6 +138,7 @@ flush() ->
create_table() ->
File = dets_filename(),
+ rabbit_file:ensure_dir(dets_filename()),
{ok, _} = dets:open_file(?MODULE, [{file, File},
{ram_file, true},
{auto_save, infinity}]).