summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-11-28 20:37:37 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-11-28 20:37:37 +0000
commitcebeff693c6d5f48dc4e94197e7808236d99d21d (patch)
treed5c2efd8314a7748a489532b0b4c9a8314c526d9
parent19cffc781d7ec36a3a64b9b4c1526c68f85695aa (diff)
downloadrabbitmq-server-cebeff693c6d5f48dc4e94197e7808236d99d21d.tar.gz
ignore empty segment files
We already ignore corrupted content.
-rw-r--r--src/rabbit_queue_index.erl25
1 files changed, 14 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4b545466..d729a7ed 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -837,13 +837,16 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
+ Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
- false -> {array_new(), 0};
+ false -> Empty;
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {ok, SegData} = file_handle_cache:read(
- Hdl, ?SEGMENT_TOTAL_SIZE),
- Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
+ Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
+ {ok, SegData} -> load_segment_entries(
+ KeepAcked, SegData, Empty);
+ eof -> Empty
+ end,
ok = file_handle_cache:close(Hdl),
Res
end.
@@ -853,15 +856,15 @@ load_segment_entries(KeepAcked,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+ load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
load_segment_entries(KeepAcked,
<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}) ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
@@ -871,10 +874,10 @@ load_segment_entries(KeepAcked,
{_Pub, del, no_ack} ->
{-1, array:reset(RelSeq, SegEntries)}
end,
- load_segment_entries(KeepAcked, SegData, SegEntries1,
- UnackedCount + UnackedCountDelta);
-load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
- {SegEntries, UnackedCount}.
+ load_segment_entries(KeepAcked, SegData,
+ {SegEntries1, UnackedCount + UnackedCountDelta});
+load_segment_entries(_KeepAcked, _SegData, Res) ->
+ Res.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).