summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-05 12:53:10 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-05 12:53:10 +0000
commit26eebb2bb949cb17cdffc8477224fcb3b7aa7f6d (patch)
tree0b52b541287ee3f289bc43c70636b263c6c67c24
parente979532601ea2f44fdc54179be8be27d5a09db16 (diff)
downloadrabbitmq-server-26eebb2bb949cb17cdffc8477224fcb3b7aa7f6d.tar.gz
Upgrade
-rw-r--r--src/rabbit_queue_index.erl39
1 files changed, 36 insertions, 3 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a78dacec..08c20ce2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,7 +21,7 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -186,6 +186,7 @@
-rabbit_upgrade({add_queue_ttl, local, []}).
-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
+-rabbit_upgrade({store_msg, local, [store_msg_size]}).
-ifdef(use_specs).
@@ -1204,10 +1205,42 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
store_msg_size_segment(_) ->
stop.
+store_msg() ->
+ foreach_queue_index({fun store_msg_journal/1,
+ fun store_msg_segment/1}).
+
+store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest};
+store_msg_journal(_) ->
+ stop.
+
+store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest};
+store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_segment(_) ->
+ stop.
-%%----------------------------------------------------------------------------
-%% TODO here?
+
+
+%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
QueuesDir = queues_dir(),