summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-05 17:37:03 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-05 17:37:03 +0100
commit2c9b9305342270852d171ba7035242fe6d3986ac (patch)
treee1725955806036c34dd2c55bad98840001fb2c10
parent26a6391d735baa50fa21dd688d689505d9c63dd3 (diff)
downloadrabbitmq-server-2c9b9305342270852d171ba7035242fe6d3986ac.tar.gz
Upgrades to the queue index and recovery terms format.
-rw-r--r--src/rabbit_queue_index.erl43
-rw-r--r--src/rabbit_recovery_terms.erl22
2 files changed, 60 insertions, 5 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 08c81a94..923abb17 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,7 +21,7 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -171,8 +171,9 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, local, []}).
--rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
-ifdef(use_specs).
@@ -1078,6 +1079,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
avoid_zeroes_segment(_) ->
stop.
+%% At upgrade time we just define every message's size as 0 - that
+%% will save us a load of faff with the message store, and means we
+%% can actually use the clean recovery terms in VQ. It does mean we
+%% don't count message bodies from before the migration, but we can
+%% live with that.
+store_msg_size() ->
+ foreach_queue_index({fun store_msg_size_journal/1,
+ fun store_msg_size_segment/1}).
+
+store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_journal(_) ->
+ stop.
+
+store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_size_segment(_) ->
+ stop.
+
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index bbf38f58..f169e13d 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -23,11 +23,14 @@
-export([start/0, stop/0, store/2, read/1, clear/0]).
--export([upgrade_recovery_terms/0, start_link/0]).
+-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([upgrade_recovery_terms/0, persistent_bytes/0]).
+
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
+-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
%%----------------------------------------------------------------------------
@@ -61,6 +64,8 @@ clear() ->
dets:delete_all_objects(?MODULE),
flush().
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
%%----------------------------------------------------------------------------
upgrade_recovery_terms() ->
@@ -84,7 +89,20 @@ upgrade_recovery_terms() ->
close_table()
end.
-start_link() -> gen_server:start_link(?MODULE, [], []).
+persistent_bytes() -> dets_upgrade(fun persistent_bytes/1).
+persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}].
+
+dets_upgrade(Fun)->
+ open_table(),
+ try
+ ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
+ store(DirBaseName, Fun(Terms)),
+ Acc
+ end, ok, ?MODULE),
+ ok
+ after
+ close_table()
+ end.
%%----------------------------------------------------------------------------