summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-05-14 13:10:37 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-05-14 13:10:37 +0100
commita452b5d2b0fc0bf22a5b35d3df7abfe4de32afea (patch)
tree39fa4758f25db1ea115de2598a43e358a53dab7f
parent2eea376912d2749a0e93a09317dbda7aab558fa2 (diff)
downloadrabbitmq-server-a452b5d2b0fc0bf22a5b35d3df7abfe4de32afea.tar.gz
Cope with trailing zeroes in journal and segment files
Makes the on-disk journal format tolerant of runs of zeroes at the end of the file. This could occur if file operations are interrupted by a crashing server. Also changes the definition of segment prefixes so that runs of zero bytes will never appear in a valid segment.
-rw-r--r--src/rabbit_queue_index.erl31
1 files changed, 27 insertions, 4 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index ea70208f..230639ee 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,7 +23,7 @@
-export([scan/3]).
--export([add_queue_ttl/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -125,7 +125,7 @@
%% seq only is binary 00 followed by 14 bits of rel seq id
%% (range: 0 - 16383)
--define(REL_SEQ_ONLY_PREFIX, 00).
+-define(REL_SEQ_ONLY_PREFIX, 01).
-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
@@ -171,6 +171,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-ifdef(use_specs).
@@ -715,7 +716,12 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- {ok, Bin} ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>}
+ when Prefix =:= ?PUB_PERSIST_JPREFIX ->
+ State;
+ {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary-unit:8>>} ->
{MsgId, MsgProps} = parse_pub_record_body(Bin),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
@@ -1057,6 +1063,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
add_queue_ttl_segment(_) ->
stop.
+avoid_zeroes() ->
+ foreach_queue_index({none, fun avoid_zeroes_segment/1}).
+
+avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest};
+avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+avoid_zeroes_segment(_) ->
+ stop.
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
@@ -1081,7 +1102,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
|| Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
-transform_file(Path, Fun) ->
+transform_file(_Path, none) ->
+ ok;
+transform_file(Path, Fun) when is_function(Fun)->
PathTmp = Path ++ ".upgrade",
case rabbit_file:file_size(Path) of
0 -> ok;