diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-05-14 13:10:37 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-05-14 13:10:37 +0100 |
commit | a452b5d2b0fc0bf22a5b35d3df7abfe4de32afea (patch) | |
tree | 39fa4758f25db1ea115de2598a43e358a53dab7f | |
parent | 2eea376912d2749a0e93a09317dbda7aab558fa2 (diff) | |
download | rabbitmq-server-a452b5d2b0fc0bf22a5b35d3df7abfe4de32afea.tar.gz |
Cope with trailing zeroes in journal and segment files
Makes the on-disk journal format tolerant of runs of zeroes at the end of the file.
This could occur if file operations are interrupted by a crashing server.
Also changes the definition of segment prefixes so that runs of zero bytes will
never appear in a valid segment.
-rw-r--r-- | src/rabbit_queue_index.erl | 31 |
1 files changed, 27 insertions, 4 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index ea70208f..230639ee 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,7 +23,7 @@ -export([scan/3]). --export([add_queue_ttl/0]). +-export([add_queue_ttl/0, avoid_zeroes/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -125,7 +125,7 @@ %% seq only is binary 00 followed by 14 bits of rel seq id %% (range: 0 - 16383) --define(REL_SEQ_ONLY_PREFIX, 00). +-define(REL_SEQ_ONLY_PREFIX, 01). -define(REL_SEQ_ONLY_PREFIX_BITS, 2). -define(REL_SEQ_ONLY_RECORD_BYTES, 2). @@ -171,6 +171,7 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -ifdef(use_specs). @@ -715,7 +716,12 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of - {ok, Bin} -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} + when Prefix =:= ?PUB_PERSIST_JPREFIX -> + State; + {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary-unit:8>>} -> {MsgId, MsgProps} = parse_pub_record_body(Bin), IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; @@ -1057,6 +1063,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, add_queue_ttl_segment(_) -> stop. +avoid_zeroes() -> + foreach_queue_index({none, fun avoid_zeroes_segment/1}). + +avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest}; +avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +avoid_zeroes_segment(_) -> + stop. + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> @@ -1081,7 +1102,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)], ok = gatherer:finish(Gatherer). -transform_file(Path, Fun) -> +transform_file(_Path, none) -> + ok; +transform_file(Path, Fun) when is_function(Fun)-> PathTmp = Path ++ ".upgrade", case rabbit_file:file_size(Path) of 0 -> ok; |