diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-05 17:37:03 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-05 17:37:03 +0100 |
commit | 2c9b9305342270852d171ba7035242fe6d3986ac (patch) | |
tree | e1725955806036c34dd2c55bad98840001fb2c10 | |
parent | 26a6391d735baa50fa21dd688d689505d9c63dd3 (diff) | |
download | rabbitmq-server-2c9b9305342270852d171ba7035242fe6d3986ac.tar.gz |
Upgrades to the queue index and recovery terms format.
-rw-r--r-- | src/rabbit_queue_index.erl | 43 | ||||
-rw-r--r-- | src/rabbit_recovery_terms.erl | 22 |
2 files changed, 60 insertions, 5 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 08c81a94..923abb17 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,7 +21,7 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -171,8 +171,9 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, local, []}). --rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). -ifdef(use_specs). @@ -1078,6 +1079,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, avoid_zeroes_segment(_) -> stop. +%% At upgrade time we just define every message's size as 0 - that +%% will save us a load of faff with the message store, and means we +%% can actually use the clean recovery terms in VQ. It does mean we +%% don't count message bodies from before the migration, but we can +%% live with that. +store_msg_size() -> + foreach_queue_index({fun store_msg_size_journal/1, + fun store_msg_size_segment/1}). + +store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_journal(_) -> + stop. + +store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_size_segment(_) -> + stop. + + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index bbf38f58..f169e13d 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -23,11 +23,14 @@ -export([start/0, stop/0, store/2, read/1, clear/0]). --export([upgrade_recovery_terms/0, start_link/0]). +-export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([upgrade_recovery_terms/0, persistent_bytes/0]). + -rabbit_upgrade({upgrade_recovery_terms, local, []}). +-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). %%---------------------------------------------------------------------------- @@ -61,6 +64,8 @@ clear() -> dets:delete_all_objects(?MODULE), flush(). +start_link() -> gen_server:start_link(?MODULE, [], []). + %%---------------------------------------------------------------------------- upgrade_recovery_terms() -> @@ -84,7 +89,20 @@ upgrade_recovery_terms() -> close_table() end. -start_link() -> gen_server:start_link(?MODULE, [], []). +persistent_bytes() -> dets_upgrade(fun persistent_bytes/1). +persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}]. + +dets_upgrade(Fun)-> + open_table(), + try + ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> + store(DirBaseName, Fun(Terms)), + Acc + end, ok, ?MODULE), + ok + after + close_table() + end. %%---------------------------------------------------------------------------- |