From 37d931f6c39a9b9b633930e45b315b5779d42417 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 17 Nov 2010 14:37:01 +0000 Subject: refactoring --- src/rabbit_queue_index.erl | 61 ++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d366ed36..248c1fbc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -351,35 +351,36 @@ recover(DurableQueues) -> DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), - Directories = case file:list_dir(QueuesDir) of - {ok, Entries} -> [ Entry || Entry <- Entries, - filelib:is_dir( - filename:join( - QueuesDir, Entry)) ]; - {error, enoent} -> [] - end, + QueueDirNames = all_queue_directory_names(QueuesDir), DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), {DurableQueueNames, DurableTerms} = lists:foldl( - fun (QueueDir, {DurableAcc, TermsAcc}) -> - case sets:is_element(QueueDir, DurableDirectories) of + fun (QueueDirName, {DurableAcc, TermsAcc}) -> + QueueDirPath = filename:join(QueuesDir, QueueDirName), + case sets:is_element(QueueDirName, DurableDirectories) of true -> TermsAcc1 = - case read_shutdown_terms( - filename:join(QueuesDir, QueueDir)) of + case read_shutdown_terms(QueueDirPath) of {error, _} -> TermsAcc; {ok, Terms} -> [Terms | TermsAcc] end, - {[dict:fetch(QueueDir, DurableDict) | DurableAcc], + {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], TermsAcc1}; false -> - Dir = filename:join(queues_dir(), QueueDir), - ok = rabbit_misc:recursive_delete([Dir]), + ok = rabbit_misc:recursive_delete([QueueDirPath]), {DurableAcc, TermsAcc} end - end, {[], []}, Directories), + end, {[], []}, QueueDirNames), {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. +all_queue_directory_names(Dir) -> + case file:list_dir(Dir) of + {ok, Entries} -> [ Entry || Entry <- Entries, + filelib:is_dir( + filename:join(Dir, Entry)) ]; + {error, enoent} -> [] + end. + %%---------------------------------------------------------------------------- %% startup and shutdown %%---------------------------------------------------------------------------- @@ -1016,23 +1017,19 @@ add_queue_ttl_segment(_) -> foreach_queue_index(Funs) -> QueuesDir = queues_dir(), - case file:list_dir(QueuesDir) of - {error, enoent} -> - ok; - {ok, Entries} -> - Queues = [ Dir || Entry <- Entries, - Dir <- [filename:join(QueuesDir, Entry)], - filelib:is_dir(Dir) ], - {ok, Gatherer} = gatherer:start_link(), - [begin - ok = gatherer:fork(Gatherer), - ok = worker_pool:submit_async( - fun () -> transform_queue(QueueDir, Gatherer, Funs) end) - end || QueueDir <- Queues], - empty = gatherer:out(Gatherer), - ok = gatherer:stop(Gatherer), - ok = rabbit_misc:unlink_and_capture_exit(Gatherer) - end. + QueueDirNames = all_queue_directory_names(QueuesDir), + {ok, Gatherer} = gatherer:start_link(), + [begin + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> + transform_queue(filename:join(QueuesDir, QueueDirName), + Gatherer, Funs) + end) + end || QueueDirName <- QueueDirNames], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), + ok = rabbit_misc:unlink_and_capture_exit(Gatherer). transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), -- cgit v1.2.1