diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-15 17:00:28 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-15 17:00:28 +0000 |
commit | cc5413d3b863c14ccca00b2a4feb9c99589da170 (patch) | |
tree | 645569c2e5f2a4487c3900dc6e5ce3189833ac70 | |
parent | baafede4c670ef0eec6a02c247930b1b11fb310d (diff) | |
download | rabbitmq-server-cc5413d3b863c14ccca00b2a4feb9c99589da170.tar.gz |
Read segment files in one go
-rw-r--r-- | src/rabbit_queue_index.erl | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8227e4cd..d3a82fbf 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -545,20 +545,22 @@ expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. read_pub_record_body(Hdl) -> case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of - {ok, Bin} -> - %% work around for binary data fragmentation. See - %% rabbit_msg_file:read_next/2 - <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin, - <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {MsgId, #message_properties{expiry = Exp}}; - Error -> - Error + {ok, Bin} -> {MsgId, MsgProps, <<>>} = extract_pub_record_body(Bin), + {MsgId, MsgProps}; + Error -> Error end. +extract_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Rest/binary>>) -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {MsgId, #message_properties{expiry = Exp}, Rest}. + %%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -845,36 +847,37 @@ load_segment(KeepAcked, #segment { path = Path }) -> false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), + {ok, SegData} = file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE), + Res = load_segment_entries(KeepAcked, SegData, array_new(), 0), ok = file_handle_cache:close(Hdl), Res end. -load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> - case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of - {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - {MsgId, MsgProps} = read_pub_record_body(Hdl), - Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, Hdl, SegEntries1, - UnackedCount + 1); - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, Hdl, SegEntries1, - UnackedCount + UnackedCountDelta); - _ErrOrEoF -> - {SegEntries, UnackedCount} - end. +load_segment_entries(KeepAcked, + <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, SegData/binary>>, + SegEntries, UnackedCount) -> + {MsgId, MsgProps, SegData1} = extract_pub_record_body(SegData), + Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + load_segment_entries(KeepAcked, SegData1, SegEntries1, UnackedCount + 1); +load_segment_entries(KeepAcked, + <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, SegData/binary>>, + SegEntries, UnackedCount) -> + {UnackedCountDelta, SegEntries1} = + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; + {Pub, del, no_ack} when KeepAcked -> + {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; + {_Pub, del, no_ack} -> + {-1, array:reset(RelSeq, SegEntries)} + end, + load_segment_entries(KeepAcked, SegData, SegEntries1, + UnackedCount + UnackedCountDelta); +load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |