diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-16 16:17:42 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-16 16:17:42 +0000 |
commit | b49c9993afa69c9cc1399717f2e02e1143ab8d5f (patch) | |
tree | bdb7653511a7c802f87096bd463fc068e20f49e8 /src/rabbit_queue_index.erl | |
parent | 871857cdf7efc48781aab28cea6056180173ea54 (diff) | |
download | rabbitmq-server-b49c9993afa69c9cc1399717f2e02e1143ab8d5f.tar.gz |
Added queue_ttl upgrade
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bde9b3d3..d366ed36 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -36,6 +36,8 @@ publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). +-export([add_queue_ttl/0]). + -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- @@ -180,6 +182,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({add_queue_ttl, []}). + -ifdef(use_specs). -type(hdl() :: ('undefined' | any())). @@ -226,6 +230,8 @@ -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}). +-spec(add_queue_ttl/0 :: () -> 'ok'). + -endif. @@ -972,3 +978,91 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> {undefined, -1}. + +%%---------------------------------------------------------------------------- +%% upgrade +%%---------------------------------------------------------------------------- + +add_queue_ttl() -> + foreach_queue_index({fun add_queue_ttl_journal/1, + fun add_queue_ttl_segment/1}). + +add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Guid:?GUID_BYTES/binary, Rest/binary>>) -> + {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid, + expiry_to_binary(undefined)], Rest}; +add_queue_ttl_journal(_) -> + stop. + +add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary, + Rest/binary>>) -> + {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest}; +add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +add_queue_ttl_segment(_) -> + stop. + +%%---------------------------------------------------------------------------- + +foreach_queue_index(Funs) -> + QueuesDir = queues_dir(), + case file:list_dir(QueuesDir) of + {error, enoent} -> + ok; + {ok, Entries} -> + Queues = [ Dir || Entry <- Entries, + Dir <- [filename:join(QueuesDir, Entry)], + filelib:is_dir(Dir) ], + {ok, Gatherer} = gatherer:start_link(), + [begin + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> transform_queue(QueueDir, Gatherer, Funs) end) + end || QueueDir <- Queues], + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), + ok = rabbit_misc:unlink_and_capture_exit(Gatherer) + end. + +transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> + ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), + [ok = transform_file(filename:join(Dir, Seg), SegmentFun) + || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + ok = gatherer:finish(Gatherer). + +transform_file(Path, Fun) -> + PathTmp = Path ++ ".upgrade", + Size = filelib:file_size(Path), + + {ok, PathTmpHdl} = + file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE], + [{write_buffer, infinity}]), + + {ok, PathHdl} = + file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []), + {ok, Content} = file_handle_cache:read(PathHdl, Size), + ok = file_handle_cache:close(PathHdl), + + ok = drive_transform_fun(Fun, PathTmpHdl, Content), + + ok = file_handle_cache:close(PathTmpHdl), + ok = file:rename(PathTmp, Path). + +drive_transform_fun(Fun, Hdl, Contents) -> + case Fun(Contents) of + stop -> + ok; + {Output, Contents1} -> + ok = file_handle_cache:append(Hdl, Output), + drive_transform_fun(Fun, Hdl, Contents1) + end. |