summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-11 12:51:50 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-11 12:51:50 +0000
commit99ac15fbc28d60adc0d38899a5a7f770530ca466 (patch)
tree659e5593445df386e9582ba542c2abf18027ffa0
parent340ae1fdefe6b7b9558292ca1e7ff43ecde06ac4 (diff)
downloadrabbitmq-server-99ac15fbc28d60adc0d38899a5a7f770530ca466.tar.gz
Upgrade messages
-rw-r--r--include/rabbit_backing_queue_spec.hrl3
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_msg_file.erl68
-rw-r--r--src/rabbit_msg_store.erl62
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--src/rabbit_variable_queue.erl15
6 files changed, 138 insertions, 30 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index accb2c0e..52ffd413 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -65,3 +65,6 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
+-spec(transform_storage/1 ::
+ (fun ((binary()) -> (rabbit_types:ok_or_error2(any(), any())))) ->
+ non_neg_integer()).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c6661d39..9e241e80 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -38,6 +38,7 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
+ {requires, file_handle_cache},
{enables, external_infrastructure}]}).
-rabbit_boot_step({file_handle_cache,
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index cfea4982..ad87ee16 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -16,7 +16,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/2]).
+-export([append/3, read/2, scan/2, scan/3]).
%%----------------------------------------------------------------------------
@@ -48,6 +48,9 @@
-spec(scan/2 :: (io_device(), file_size()) ->
{'ok', [{rabbit_guid:guid(), msg_size(), position()}],
position()}).
+-spec(scan/3 :: (io_device(), file_size(),
+ fun ((rabbit_guid:guid(), msg_size(), position(), binary()) -> any())) ->
+ {'ok', [any()], position()}).
-endif.
@@ -79,43 +82,50 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
+scan_fun(Guid, TotalSize, Offset, _Msg) ->
+ {Guid, TotalSize, Offset}.
+
scan(FileHdl, FileSize) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0).
+ scan(FileHdl, FileSize, <<>>, 0, [], 0, fun scan_fun/4).
+
+scan(FileHdl, FileSize, Fun) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, [], 0, Fun).
-scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset, Fun) ->
Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scanner(<<Data/binary, Data1/binary>>, Acc, ScanOffset, Fun),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1);
+ scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1, Fun);
_KO ->
{ok, Acc, ScanOffset}
end.
-scan(<<>>, Acc, Offset) ->
- {<<>>, Acc, Offset};
-scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
- {<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
- WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- case WriteMarker of
- ?WRITE_OK_MARKER ->
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
- %% which we read the Guid as a number, and then convert it
- %% back to a binary in order to work around bugs in
- %% Erlang's GC.
- <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
- <<GuidAndMsg:Size/binary>>,
- <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
- _ ->
- scan(Rest, Acc, Offset + TotalSize)
- end;
-scan(Data, Acc, Offset) ->
- {Data, Acc, Offset}.
+scanner(<<>>, Acc, Offset, _Fun) ->
+ {<<>>, Acc, Offset};
+scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset, _Fun) ->
+ {<<>>, Acc, Offset}; %% Nothing to do other than stop.
+scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset, Fun) ->
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the Guid as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
+ <<GuidAndMsg:Size/binary>>,
+ <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
+ scanner(Rest, [Fun(Guid, TotalSize, Offset, Msg) | Acc],
+ Offset + TotalSize, Fun);
+ _ ->
+ scanner(Rest, Acc, Offset + TotalSize, Fun)
+ end;
+scanner(Data, Acc, Offset, _Fun) ->
+ {Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e9c356e1..bd8d61e8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,16 +26,20 @@
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
+-export([transform_dir/3, force_recovery/2]). %% upgrade
+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
-include("rabbit_msg_store.hrl").
+-include_lib("kernel/include/file.hrl").
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
+-define(TRANSFORM_TMP, "transform_tmp").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -160,6 +164,10 @@
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
+-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
+-spec(transform_dir/3 :: (file:filename(), server(),
+ fun ((binary())->({'ok', msg()} | {error, any()}))) ->
+ non_neg_integer()).
-endif.
@@ -1956,3 +1964,57 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{got, FinalOffsetZ},
{destination, Destination}]}
end.
+
+force_recovery(BaseDir, Server) ->
+ Dir = filename:join(BaseDir, atom_to_list(Server)),
+ file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ [file:delete(filename:join(Dir, File)) ||
+ File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)],
+ ok.
+
+transform_dir(BaseDir, Server, TransformFun) ->
+ Dir = filename:join(BaseDir, atom_to_list(Server)),
+ TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
+ case filelib:is_dir(TmpDir) of
+ true -> throw({error, previously_failed_transform});
+ false ->
+ Count = lists:sum(
+ [transform_msg_file(filename:join(Dir, File),
+ filename:join(TmpDir, File),
+ TransformFun) ||
+ File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)]),
+ [file:delete(filename:join(Dir, File)) ||
+ File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)],
+ [file:copy(filename:join(TmpDir, File), filename:join(Dir, File)) ||
+ File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)],
+ [file:delete(filename:join(TmpDir, File)) ||
+ File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)],
+ ok = file:del_dir(TmpDir),
+ Count
+ end.
+
+transform_msg_file(FileOld, FileNew, TransformFun) ->
+ rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ {ok, #file_info{size=Size}} = file:read_file_info(FileOld),
+ {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
+ {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
+ [{write_buffer,
+ ?HANDLE_CACHE_BUFFER_SIZE}]),
+ {ok, Acc, Size} =
+ rabbit_msg_file:scan(
+ RefOld, Size,
+ fun(Guid, _Size, _Offset, BinMsg) ->
+ case TransformFun(BinMsg) of
+ {ok, MsgNew} ->
+ rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ 1;
+ {error, Reason} ->
+ error_logger:error_msg("Message transform failed: ~p~n",
+ [Reason]),
+ 0
+ end
+ end),
+ file_handle_cache:close(RefOld),
+ file_handle_cache:close(RefNew),
+ lists:sum(Acc).
+
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 68b88b3e..f4e27cc8 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -25,6 +25,7 @@
-rabbit_upgrade({add_ip_to_listener, []}).
-rabbit_upgrade({internal_exchanges, []}).
-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
+-rabbit_upgrade({multiple_routing_keys, []}).
%% -------------------------------------------------------------------
@@ -35,6 +36,7 @@
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
+-spec(multiple_routing_keys/0 :: () -> 'ok').
-endif.
@@ -101,3 +103,20 @@ mnesia(TableName, Fun, FieldList, NewRecordName) ->
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
NewRecordName),
ok.
+
+%%--------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ _UpgradeMsgCount = rabbit_variable_queue:transform_storage(
+ fun (BinMsg) ->
+ case binary_to_term(BinMsg) of
+ {basic_message, ExchangeName, Routing_Key, Content, Guid,
+ Persistent} ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ _ ->
+ {error, corrupt_message}
+ end
+ end),
+ ok.
+
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7142d560..f2176c0e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, transform_storage/1]).
-export([start/1, stop/0]).
@@ -1801,3 +1801,16 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun) +
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).