summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-22 16:57:39 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-22 16:57:39 +0000
commit9d3eb1f0bd42cc23d3ad2474721d0a0a4b4fcf8e (patch)
tree6468a37f8a70a80eb041593550829ee4e5f69e83
parent102eb1221e34274c2fa54595d3c2fd258645f410 (diff)
downloadrabbitmq-server-9d3eb1f0bd42cc23d3ad2474721d0a0a4b4fcf8e.tar.gz
Revert re-arrangement of upgrade steps
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--src/rabbit_basic.erl4
-rw-r--r--src/rabbit_msg_store.erl33
-rw-r--r--src/rabbit_variable_queue.erl29
4 files changed, 35 insertions, 33 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 4889abff..17cdedc2 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -65,4 +65,4 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
--spec(store_names/0 :: () -> [atom()]).
+-spec(multiple_routing_keys/0 :: () -> 'ok').
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 376a303e..f29cc805 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -43,8 +43,8 @@
properties_input(), binary()) -> rabbit_types:message()).
-spec(message/3 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- rabbit_types:decoded_content()) -> {'ok', rabbit_types:message()} |
- {'error', any()}).
+ rabbit_types:decoded_content()) ->
+ rabbit_types:ok_or_error2(rabbit_types:message() | any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d798c4f7..ef0e2e0d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,7 +26,7 @@
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
--export([multiple_routing_keys/0]). %% upgrade
+-export([transform_dir/3, force_recovery/2]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -34,9 +34,8 @@
%%----------------------------------------------------------------------------
-include("rabbit_msg_store.hrl").
--include_lib("kernel/include/file.hrl").
--define(SYNC_INTERVAL, 25). %% milliseconds
+-define(SYNC_INTERVAL, 5). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
-define(TRANSFORM_TMP, "transform_tmp").
@@ -106,8 +105,6 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({multiple_routing_keys, []}).
-
-ifdef(use_specs).
-export_type([gc_state/0, file_num/0]).
@@ -166,7 +163,9 @@
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
--spec(multiple_routing_keys/0 :: () -> 'ok').
+-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
+-spec(transform_dir/3 :: (file:filename(), server(),
+ fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok').
-endif.
@@ -1968,25 +1967,6 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{destination, Destination}]}
end.
-%%----------------------------------------------------------------------------
-%% upgrade
-%%----------------------------------------------------------------------------
-
-multiple_routing_keys() ->
- [transform_store(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- Guid, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
- (_) -> {error, corrupt_message}
- end, Store) || Store <- rabbit_variable_queue:store_names()],
- ok.
-
-%% Assumes message store is not running
-transform_store(TransformFun, Store) ->
- force_recovery(rabbit_mnesia:dir(), Store),
- transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
-
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
@@ -2017,14 +1997,13 @@ transform_dir(BaseDir, Store, TransformFun) ->
transform_msg_file(FileOld, FileNew, TransformFun) ->
rabbit_misc:ensure_parent_dirs_exist(FileNew),
- {ok, #file_info{size=Size}} = file:read_file_info(FileOld),
{ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
?HANDLE_CACHE_BUFFER_SIZE}]),
{ok, _Acc, _IgnoreSize} =
rabbit_msg_file:scan(
- RefOld, Size,
+ RefOld, filelib:file_size(FileOld),
fun({Guid, _Size, _Offset, BinMsg}, ok) ->
case TransformFun(binary_to_term(BinMsg)) of
{ok, MsgNew} ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4eb9c3b8..3ef76d15 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, store_names/0]).
+ status/1, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -294,6 +294,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, []}).
+
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -1802,5 +1804,26 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
-store_names() ->
- [?PERSISTENT_MSG_STORE, ?TRANSIENT_MSG_STORE].
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ Guid, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
+
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).