diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-02-22 16:57:39 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-22 16:57:39 +0000 |
commit | 9d3eb1f0bd42cc23d3ad2474721d0a0a4b4fcf8e (patch) | |
tree | 6468a37f8a70a80eb041593550829ee4e5f69e83 | |
parent | 102eb1221e34274c2fa54595d3c2fd258645f410 (diff) | |
download | rabbitmq-server-9d3eb1f0bd42cc23d3ad2474721d0a0a4b4fcf8e.tar.gz |
Revert re-arrangement of upgrade steps
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 4 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 33 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
4 files changed, 35 insertions, 33 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 4889abff..17cdedc2 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -65,4 +65,4 @@ -spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). --spec(store_names/0 :: () -> [atom()]). +-spec(multiple_routing_keys/0 :: () -> 'ok'). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 376a303e..f29cc805 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -43,8 +43,8 @@ properties_input(), binary()) -> rabbit_types:message()). -spec(message/3 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - rabbit_types:decoded_content()) -> {'ok', rabbit_types:message()} | - {'error', any()}). + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message() | any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d798c4f7..ef0e2e0d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -26,7 +26,7 @@ -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal --export([multiple_routing_keys/0]). %% upgrade +-export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -34,9 +34,8 @@ %%---------------------------------------------------------------------------- -include("rabbit_msg_store.hrl"). --include_lib("kernel/include/file.hrl"). --define(SYNC_INTERVAL, 25). %% milliseconds +-define(SYNC_INTERVAL, 5). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). -define(TRANSFORM_TMP, "transform_tmp"). @@ -106,8 +105,6 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). - -ifdef(use_specs). -export_type([gc_state/0, file_num/0]). @@ -166,7 +163,9 @@ -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> deletion_thunk()). -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). --spec(multiple_routing_keys/0 :: () -> 'ok'). +-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok'). +-spec(transform_dir/3 :: (file:filename(), server(), + fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'). -endif. @@ -1968,25 +1967,6 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {destination, Destination}]} end. -%%---------------------------------------------------------------------------- -%% upgrade -%%---------------------------------------------------------------------------- - -multiple_routing_keys() -> - [transform_store( - fun ({basic_message, ExchangeName, Routing_Key, Content, - Guid, Persistent}) -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - Guid, Persistent}}; - (_) -> {error, corrupt_message} - end, Store) || Store <- rabbit_variable_queue:store_names()], - ok. - -%% Assumes message store is not running -transform_store(TransformFun, Store) -> - force_recovery(rabbit_mnesia:dir(), Store), - transform_dir(rabbit_mnesia:dir(), Store, TransformFun). - force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), file:delete(filename:join(Dir, ?CLEAN_FILENAME)), @@ -2017,14 +1997,13 @@ transform_dir(BaseDir, Store, TransformFun) -> transform_msg_file(FileOld, FileNew, TransformFun) -> rabbit_misc:ensure_parent_dirs_exist(FileNew), - {ok, #file_info{size=Size}} = file:read_file_info(FileOld), {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]), {ok, _Acc, _IgnoreSize} = rabbit_msg_file:scan( - RefOld, Size, + RefOld, filelib:file_size(FileOld), fun({Guid, _Size, _Offset, BinMsg}, ok) -> case TransformFun(binary_to_term(BinMsg)) of {ok, MsgNew} -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4eb9c3b8..3ef76d15 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, store_names/0]). + status/1, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -294,6 +294,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({multiple_routing_keys, []}). + -ifdef(use_specs). -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -1802,5 +1804,26 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. -store_names() -> - [?PERSISTENT_MSG_STORE, ?TRANSIENT_MSG_STORE]. +%%---------------------------------------------------------------------------- +%% Upgrading +%%---------------------------------------------------------------------------- + +multiple_routing_keys() -> + transform_storage( + fun ({basic_message, ExchangeName, Routing_Key, Content, + Guid, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + (_) -> {error, corrupt_message} + end), + ok. + + +%% Assumes message store is not running +transform_storage(TransformFun) -> + transform_store(?PERSISTENT_MSG_STORE, TransformFun), + transform_store(?TRANSIENT_MSG_STORE, TransformFun). + +transform_store(Store, TransformFun) -> + rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), + rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). |