diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-02-11 12:51:50 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-11 12:51:50 +0000 |
commit | 99ac15fbc28d60adc0d38899a5a7f770530ca466 (patch) | |
tree | 659e5593445df386e9582ba542c2abf18027ffa0 | |
parent | 340ae1fdefe6b7b9558292ca1e7ff43ecde06ac4 (diff) | |
download | rabbitmq-server-99ac15fbc28d60adc0d38899a5a7f770530ca466.tar.gz |
Upgrade messages
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 1 | ||||
-rw-r--r-- | src/rabbit_msg_file.erl | 68 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 62 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
6 files changed, 138 insertions, 30 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index accb2c0e..52ffd413 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -65,3 +65,6 @@ -spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). +-spec(transform_storage/1 :: + (fun ((binary()) -> (rabbit_types:ok_or_error2(any(), any())))) -> + non_neg_integer()). diff --git a/src/rabbit.erl b/src/rabbit.erl index c6661d39..9e241e80 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -38,6 +38,7 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, + {requires, file_handle_cache}, {enables, external_infrastructure}]}). -rabbit_boot_step({file_handle_cache, diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index cfea4982..ad87ee16 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -16,7 +16,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/2]). +-export([append/3, read/2, scan/2, scan/3]). %%---------------------------------------------------------------------------- @@ -48,6 +48,9 @@ -spec(scan/2 :: (io_device(), file_size()) -> {'ok', [{rabbit_guid:guid(), msg_size(), position()}], position()}). +-spec(scan/3 :: (io_device(), file_size(), + fun ((rabbit_guid:guid(), msg_size(), position(), binary()) -> any())) -> + {'ok', [any()], position()}). -endif. @@ -79,43 +82,50 @@ read(FileHdl, TotalSize) -> KO -> KO end. +scan_fun(Guid, TotalSize, Offset, _Msg) -> + {Guid, TotalSize, Offset}. + scan(FileHdl, FileSize) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, [], 0). + scan(FileHdl, FileSize, <<>>, 0, [], 0, fun scan_fun/4). + +scan(FileHdl, FileSize, Fun) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, [], 0, Fun). -scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> +scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) -> {ok, Acc, ScanOffset}; -scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> +scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset, Fun) -> Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), case file_handle_cache:read(FileHdl, Read) of {ok, Data1} -> {Data2, Acc1, ScanOffset1} = - scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scanner(<<Data/binary, Data1/binary>>, Acc, ScanOffset, Fun), ReadOffset1 = ReadOffset + size(Data1), - scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1); + scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1, Fun); _KO -> {ok, Acc, ScanOffset} end. -scan(<<>>, Acc, Offset) -> - {<<>>, Acc, Offset}; -scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> - {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, - WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> - TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, - case WriteMarker of - ?WRITE_OK_MARKER -> - %% Here we take option 5 from - %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in - %% which we read the Guid as a number, and then convert it - %% back to a binary in order to work around bugs in - %% Erlang's GC. - <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = - <<GuidAndMsg:Size/binary>>, - <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); - _ -> - scan(Rest, Acc, Offset + TotalSize) - end; -scan(Data, Acc, Offset) -> - {Data, Acc, Offset}. +scanner(<<>>, Acc, Offset, _Fun) -> + {<<>>, Acc, Offset}; +scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset, _Fun) -> + {<<>>, Acc, Offset}; %% Nothing to do other than stop. +scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset, Fun) -> + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + case WriteMarker of + ?WRITE_OK_MARKER -> + %% Here we take option 5 from + %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in + %% which we read the Guid as a number, and then convert it + %% back to a binary in order to work around bugs in + %% Erlang's GC. + <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> = + <<GuidAndMsg:Size/binary>>, + <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, + scanner(Rest, [Fun(Guid, TotalSize, Offset, Msg) | Acc], + Offset + TotalSize, Fun); + _ -> + scanner(Rest, Acc, Offset + TotalSize, Fun) + end; +scanner(Data, Acc, Offset, _Fun) -> + {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e9c356e1..bd8d61e8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -26,16 +26,20 @@ -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal +-export([transform_dir/3, force_recovery/2]). %% upgrade + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- -include("rabbit_msg_store.hrl"). +-include_lib("kernel/include/file.hrl"). -define(SYNC_INTERVAL, 5). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). +-define(TRANSFORM_TMP, "transform_tmp"). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). @@ -160,6 +164,10 @@ -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> deletion_thunk()). -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). +-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok'). +-spec(transform_dir/3 :: (file:filename(), server(), + fun ((binary())->({'ok', msg()} | {error, any()}))) -> + non_neg_integer()). -endif. @@ -1956,3 +1964,57 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {got, FinalOffsetZ}, {destination, Destination}]} end. + +force_recovery(BaseDir, Server) -> + Dir = filename:join(BaseDir, atom_to_list(Server)), + file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + [file:delete(filename:join(Dir, File)) || + File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)], + ok. + +transform_dir(BaseDir, Server, TransformFun) -> + Dir = filename:join(BaseDir, atom_to_list(Server)), + TmpDir = filename:join(Dir, ?TRANSFORM_TMP), + case filelib:is_dir(TmpDir) of + true -> throw({error, previously_failed_transform}); + false -> + Count = lists:sum( + [transform_msg_file(filename:join(Dir, File), + filename:join(TmpDir, File), + TransformFun) || + File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)]), + [file:delete(filename:join(Dir, File)) || + File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)], + [file:copy(filename:join(TmpDir, File), filename:join(Dir, File)) || + File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)], + [file:delete(filename:join(TmpDir, File)) || + File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)], + ok = file:del_dir(TmpDir), + Count + end. + +transform_msg_file(FileOld, FileNew, TransformFun) -> + rabbit_misc:ensure_parent_dirs_exist(FileNew), + {ok, #file_info{size=Size}} = file:read_file_info(FileOld), + {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), + {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], + [{write_buffer, + ?HANDLE_CACHE_BUFFER_SIZE}]), + {ok, Acc, Size} = + rabbit_msg_file:scan( + RefOld, Size, + fun(Guid, _Size, _Offset, BinMsg) -> + case TransformFun(BinMsg) of + {ok, MsgNew} -> + rabbit_msg_file:append(RefNew, Guid, MsgNew), + 1; + {error, Reason} -> + error_logger:error_msg("Message transform failed: ~p~n", + [Reason]), + 0 + end + end), + file_handle_cache:close(RefOld), + file_handle_cache:close(RefNew), + lists:sum(Acc). + diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 68b88b3e..f4e27cc8 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,6 +25,7 @@ -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). +-rabbit_upgrade({multiple_routing_keys, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(multiple_routing_keys/0 :: () -> 'ok'). -endif. @@ -101,3 +103,20 @@ mnesia(TableName, Fun, FieldList, NewRecordName) -> {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. + +%%-------------------------------------------------------------------- + +multiple_routing_keys() -> + _UpgradeMsgCount = rabbit_variable_queue:transform_storage( + fun (BinMsg) -> + case binary_to_term(BinMsg) of + {basic_message, ExchangeName, Routing_Key, Content, Guid, + Persistent} -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + _ -> + {error, corrupt_message} + end + end), + ok. + diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7142d560..f2176c0e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1]). + status/1, transform_storage/1]). -export([start/1, stop/0]). @@ -1801,3 +1801,16 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> push_betas_to_deltas( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. + +%%---------------------------------------------------------------------------- +%% Upgrading +%%---------------------------------------------------------------------------- + +%% Assumes message store is not running +transform_storage(TransformFun) -> + transform_store(?PERSISTENT_MSG_STORE, TransformFun) + + transform_store(?TRANSIENT_MSG_STORE, TransformFun). + +transform_store(Store, TransformFun) -> + rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), + rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). |