summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-11-16 16:17:42 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-11-16 16:17:42 +0000
commitb49c9993afa69c9cc1399717f2e02e1143ab8d5f (patch)
treebdb7653511a7c802f87096bd463fc068e20f49e8 /src/rabbit_queue_index.erl
parent871857cdf7efc48781aab28cea6056180173ea54 (diff)
downloadrabbitmq-server-b49c9993afa69c9cc1399717f2e02e1143ab8d5f.tar.gz
Added queue_ttl upgrade
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl94
1 files changed, 94 insertions, 0 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bde9b3d3..d366ed36 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -36,6 +36,8 @@
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
+-export([add_queue_ttl/0]).
+
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
@@ -180,6 +182,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({add_queue_ttl, []}).
+
-ifdef(use_specs).
-type(hdl() :: ('undefined' | any())).
@@ -226,6 +230,8 @@
-spec(recover/1 ::
([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(add_queue_ttl/0 :: () -> 'ok').
+
-endif.
@@ -972,3 +978,91 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
{undefined, -1}.
+
+%%----------------------------------------------------------------------------
+%% upgrade
+%%----------------------------------------------------------------------------
+
+add_queue_ttl() ->
+ foreach_queue_index({fun add_queue_ttl_journal/1,
+ fun add_queue_ttl_segment/1}).
+
+add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Guid:?GUID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ expiry_to_binary(undefined)], Rest};
+add_queue_ttl_journal(_) ->
+ stop.
+
+add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ Rest/binary>>) ->
+ {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+add_queue_ttl_segment(_) ->
+ stop.
+
+%%----------------------------------------------------------------------------
+
+foreach_queue_index(Funs) ->
+ QueuesDir = queues_dir(),
+ case file:list_dir(QueuesDir) of
+ {error, enoent} ->
+ ok;
+ {ok, Entries} ->
+ Queues = [ Dir || Entry <- Entries,
+ Dir <- [filename:join(QueuesDir, Entry)],
+ filelib:is_dir(Dir) ],
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () -> transform_queue(QueueDir, Gatherer, Funs) end)
+ end || QueueDir <- Queues],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
+ ok = rabbit_misc:unlink_and_capture_exit(Gatherer)
+ end.
+
+transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
+ ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
+ [ok = transform_file(filename:join(Dir, Seg), SegmentFun)
+ || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ ok = gatherer:finish(Gatherer).
+
+transform_file(Path, Fun) ->
+ PathTmp = Path ++ ".upgrade",
+ Size = filelib:file_size(Path),
+
+ {ok, PathTmpHdl} =
+ file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE],
+ [{write_buffer, infinity}]),
+
+ {ok, PathHdl} =
+ file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
+
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = file:rename(PathTmp, Path).
+
+drive_transform_fun(Fun, Hdl, Contents) ->
+ case Fun(Contents) of
+ stop ->
+ ok;
+ {Output, Contents1} ->
+ ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
+ end.