diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-11-18 11:47:19 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-11-18 11:47:19 +0000 |
commit | 631f130edc3f8fbb704e1c1390925b8d7009b103 (patch) | |
tree | a34928e810f573309608bc75ce0a3fa6f47317f6 | |
parent | 7e279bdbe449573b067ba3246d08b4eb53656003 (diff) | |
parent | 37d931f6c39a9b9b633930e45b315b5779d42417 (diff) | |
download | rabbitmq-server-631f130edc3f8fbb704e1c1390925b8d7009b103.tar.gz |
Merged bug 23424 into default
-rw-r--r-- | src/rabbit_queue_index.erl | 121 |
1 files changed, 106 insertions, 15 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bde9b3d3..248c1fbc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -36,6 +36,8 @@ publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). +-export([add_queue_ttl/0]). + -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- @@ -180,6 +182,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({add_queue_ttl, []}). + -ifdef(use_specs). -type(hdl() :: ('undefined' | any())). @@ -226,6 +230,8 @@ -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}). +-spec(add_queue_ttl/0 :: () -> 'ok'). + -endif. @@ -345,35 +351,36 @@ recover(DurableQueues) -> DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), - Directories = case file:list_dir(QueuesDir) of - {ok, Entries} -> [ Entry || Entry <- Entries, - filelib:is_dir( - filename:join( - QueuesDir, Entry)) ]; - {error, enoent} -> [] - end, + QueueDirNames = all_queue_directory_names(QueuesDir), DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), {DurableQueueNames, DurableTerms} = lists:foldl( - fun (QueueDir, {DurableAcc, TermsAcc}) -> - case sets:is_element(QueueDir, DurableDirectories) of + fun (QueueDirName, {DurableAcc, TermsAcc}) -> + QueueDirPath = filename:join(QueuesDir, QueueDirName), + case sets:is_element(QueueDirName, DurableDirectories) of true -> TermsAcc1 = - case read_shutdown_terms( - filename:join(QueuesDir, QueueDir)) of + case read_shutdown_terms(QueueDirPath) of {error, _} -> TermsAcc; {ok, Terms} -> [Terms | TermsAcc] end, - {[dict:fetch(QueueDir, DurableDict) | DurableAcc], + {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], TermsAcc1}; false -> - Dir = filename:join(queues_dir(), QueueDir), - ok = rabbit_misc:recursive_delete([Dir]), + ok = rabbit_misc:recursive_delete([QueueDirPath]), {DurableAcc, TermsAcc} end - end, {[], []}, Directories), + end, {[], []}, QueueDirNames), {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. +all_queue_directory_names(Dir) -> + case file:list_dir(Dir) of + {ok, Entries} -> [ Entry || Entry <- Entries, + filelib:is_dir( + filename:join(Dir, Entry)) ]; + {error, enoent} -> [] + end. + %%---------------------------------------------------------------------------- %% startup and shutdown %%---------------------------------------------------------------------------- @@ -972,3 +979,87 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> {undefined, -1}. + +%%---------------------------------------------------------------------------- +%% upgrade +%%---------------------------------------------------------------------------- + +add_queue_ttl() -> + foreach_queue_index({fun add_queue_ttl_journal/1, + fun add_queue_ttl_segment/1}). + +add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Guid:?GUID_BYTES/binary, Rest/binary>>) -> + {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid, + expiry_to_binary(undefined)], Rest}; +add_queue_ttl_journal(_) -> + stop. + +add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary, + Rest/binary>>) -> + {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest}; +add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +add_queue_ttl_segment(_) -> + stop. + +%%---------------------------------------------------------------------------- + +foreach_queue_index(Funs) -> + QueuesDir = queues_dir(), + QueueDirNames = all_queue_directory_names(QueuesDir), + {ok, Gatherer} = gatherer:start_link(), + [begin + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> + transform_queue(filename:join(QueuesDir, QueueDirName), + Gatherer, Funs) + end) + end || QueueDirName <- QueueDirNames], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), + ok = rabbit_misc:unlink_and_capture_exit(Gatherer). + +transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> + ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), + [ok = transform_file(filename:join(Dir, Seg), SegmentFun) + || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + ok = gatherer:finish(Gatherer). + +transform_file(Path, Fun) -> + PathTmp = Path ++ ".upgrade", + Size = filelib:file_size(Path), + + {ok, PathTmpHdl} = + file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE], + [{write_buffer, infinity}]), + + {ok, PathHdl} = + file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []), + {ok, Content} = file_handle_cache:read(PathHdl, Size), + ok = file_handle_cache:close(PathHdl), + + ok = drive_transform_fun(Fun, PathTmpHdl, Content), + + ok = file_handle_cache:close(PathTmpHdl), + ok = file:rename(PathTmp, Path). + +drive_transform_fun(Fun, Hdl, Contents) -> + case Fun(Contents) of + stop -> + ok; + {Output, Contents1} -> + ok = file_handle_cache:append(Hdl, Output), + drive_transform_fun(Fun, Hdl, Contents1) + end. |