summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-15 17:00:28 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-15 17:00:28 +0000
commitcc5413d3b863c14ccca00b2a4feb9c99589da170 (patch)
tree645569c2e5f2a4487c3900dc6e5ce3189833ac70
parentbaafede4c670ef0eec6a02c247930b1b11fb310d (diff)
downloadrabbitmq-server-cc5413d3b863c14ccca00b2a4feb9c99589da170.tar.gz
Read segment files in one go
-rw-r--r--src/rabbit_queue_index.erl79
1 files changed, 41 insertions, 38 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8227e4cd..d3a82fbf 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -545,20 +545,22 @@ expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
read_pub_record_body(Hdl) ->
case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of
- {ok, Bin} ->
- %% work around for binary data fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
- <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties{expiry = Exp}};
- Error ->
- Error
+ {ok, Bin} -> {MsgId, MsgProps, <<>>} = extract_pub_record_body(Bin),
+ {MsgId, MsgProps};
+ Error -> Error
end.
+extract_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Rest/binary>>) ->
+ %% work around for binary data fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
+ Exp = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ {MsgId, #message_properties{expiry = Exp}, Rest}.
+
%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
@@ -845,36 +847,37 @@ load_segment(KeepAcked, #segment { path = Path }) ->
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
+ {ok, SegData} = file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE),
+ Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
- case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
- {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {MsgId, MsgProps} = read_pub_record_body(Hdl),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, Hdl, SegEntries1,
- UnackedCount + 1);
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, Hdl, SegEntries1,
- UnackedCount + UnackedCountDelta);
- _ErrOrEoF ->
- {SegEntries, UnackedCount}
- end.
+load_segment_entries(KeepAcked,
+ <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, SegData/binary>>,
+ SegEntries, UnackedCount) ->
+ {MsgId, MsgProps, SegData1} = extract_pub_record_body(SegData),
+ Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ load_segment_entries(KeepAcked, SegData1, SegEntries1, UnackedCount + 1);
+load_segment_entries(KeepAcked,
+ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, SegData/binary>>,
+ SegEntries, UnackedCount) ->
+ {UnackedCountDelta, SegEntries1} =
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
+ {Pub, del, no_ack} when KeepAcked ->
+ {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
+ {_Pub, del, no_ack} ->
+ {-1, array:reset(RelSeq, SegEntries)}
+ end,
+ load_segment_entries(KeepAcked, SegData, SegEntries1,
+ UnackedCount + UnackedCountDelta);
+load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).