diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 12:53:10 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 12:53:10 +0000 |
commit | 26eebb2bb949cb17cdffc8477224fcb3b7aa7f6d (patch) | |
tree | 0b52b541287ee3f289bc43c70636b263c6c67c24 | |
parent | e979532601ea2f44fdc54179be8be27d5a09db16 (diff) | |
download | rabbitmq-server-26eebb2bb949cb17cdffc8477224fcb3b7aa7f6d.tar.gz |
Upgrade
-rw-r--r-- | src/rabbit_queue_index.erl | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a78dacec..08c20ce2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,7 +21,7 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -186,6 +186,7 @@ -rabbit_upgrade({add_queue_ttl, local, []}). -rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). +-rabbit_upgrade({store_msg, local, [store_msg_size]}). -ifdef(use_specs). @@ -1204,10 +1205,42 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, store_msg_size_segment(_) -> stop. +store_msg() -> + foreach_queue_index({fun store_msg_journal/1, + fun store_msg_segment/1}). + +store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; +store_msg_journal(_) -> + stop. + +store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; +store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_segment(_) -> + stop. -%%---------------------------------------------------------------------------- -%% TODO here? + + +%%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> QueuesDir = queues_dir(), |