diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-21 15:14:59 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-21 15:14:59 +0100 |
commit | 77d688e5fddb1daa4374f42698770e6737e10ea0 (patch) | |
tree | 1d108bd2b08cc32505c30956da62a806b4165488 | |
parent | c3c3e7fad74142ba446acb990c7b303ec9f71e48 (diff) | |
download | rabbitmq-server-77d688e5fddb1daa4374f42698770e6737e10ea0.tar.gz |
cosmetic -> with_queue_head => queue_head
Also, time for a new optimisation! YAY!
Previously, reading a message off disk meant seeking to the correct position and then reading the data. Now if the handle is already in the right position, then that seek is a waste of quite a lot of time, as it is an OS call. Now, I cache the location of the handle and so avoid seeking when possible. This has a MASSIVE effect on performance, especially in straight line cases, eg where a single prefetcher can drain a queue of disk in about one third of the time it used to take. Just looking at the code coverage from the test suite, there were just 534 seeks and 8582 cases where we found the handle in the right position already. This is a fairly small amount of code, and provides very useful benefits.
-rw-r--r-- | src/rabbit_disk_queue.erl | 56 |
1 files changed, 33 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 4b8759f8..d19469d6 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -754,7 +754,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). -get_read_handle(File, Offset, State = +get_read_handle(File, Offset, TotalSize, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit, current_file_name = CurName, @@ -766,7 +766,8 @@ get_read_handle(File, Offset, State = true -> State end, Now = now(), - {FileHdl, ReadHdls1, ReadHdlsAge1} = + NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} = case dict:find(File, ReadHdls) of error -> {ok, Hdl} = file:open(form_filename(File), @@ -774,21 +775,21 @@ get_read_handle(File, Offset, State = read_ahead]), case dict:size(ReadHdls) < ReadFileHandlesLimit of true -> - {Hdl, ReadHdls, ReadHdlsAge}; - _False -> + {Hdl, 0, ReadHdls, ReadHdlsAge}; + false -> {Then, OldFile, ReadHdlsAge2} = gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, Then}} = + {ok, {OldHdl, _Offset, Then}} = dict:find(OldFile, ReadHdls), ok = file:close(OldHdl), - {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} end; - {ok, {Hdl, Then}} -> - {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + {ok, {Hdl, OldOffset1, Then}} -> + {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} end, - ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1), ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), - {FileHdl, + {FileHdl, Offset /= OldOffset, State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}. sequence_lookup(Sequences, Q) -> @@ -874,7 +875,7 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- internal_fetch_body(Q, MarkDelivered, Advance, State) -> - case with_queue_head(Q, MarkDelivered, Advance, State) of + case queue_head(Q, MarkDelivered, Advance, State) of E = {ok, empty, _} -> E; {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} -> {Message, State2} = read_stored_message(StoreEntry, State1), @@ -882,7 +883,7 @@ internal_fetch_body(Q, MarkDelivered, Advance, State) -> end. internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> - case with_queue_head(Q, MarkDelivered, Advance, State) of + case queue_head(Q, MarkDelivered, Advance, State) of E = {ok, empty, _} -> E; {ok, AckTag, IsDelivered, #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent }, @@ -890,8 +891,8 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1} end. -with_queue_head(Q, MarkDelivered, Advance, - State = #dqstate { sequences = Sequences }) -> +queue_head(Q, MarkDelivered, Advance, + State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId -> @@ -913,9 +914,10 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, total_size = TotalSize }, State) -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {FileHdl, State1} = get_read_handle(File, Offset, State), + {FileHdl, SeekReq, State1} = + get_read_handle(File, Offset, TotalSize, State), {ok, {MsgBody, _IsPersistent, EncodedBodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), + read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq), Message = #basic_message {} = bin_to_msg(MsgBody), ok = if RefCount > 1 -> insert_into_cache(Message, EncodedBodySize, State1); @@ -1480,7 +1482,7 @@ close_file(File, State = #dqstate { read_file_handles = case dict:find(File, ReadHdls) of error -> State; - {ok, {Hdl, Then}} -> + {ok, {Hdl, _Offset, Then}} -> ok = file:close(Hdl), State #dqstate { read_file_handles = { dict:erase(File, ReadHdls), @@ -1867,10 +1869,17 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> KO -> KO end. -read_message_at_offset(FileHdl, Offset, TotalSize) -> +read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) -> TotalSizeWriteOkBytes = TotalSize + 1, - case file:position(FileHdl, {bof, Offset}) of - {ok, Offset} -> + SeekRes = case SeekReq of + true -> case file:position(FileHdl, {bof, Offset}) of + {ok, Offset} -> ok; + KO -> KO + end; + false -> ok + end, + case SeekRes of + ok -> case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, @@ -1884,9 +1893,9 @@ read_message_at_offset(FileHdl, Offset, TotalSize) -> ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> -> {ok, {MsgBody, true, BodySize}} end; - KO -> KO + KO1 -> KO1 end; - KO -> KO + KO2 -> KO2 end. scan_file_for_valid_messages(File) -> @@ -1931,7 +1940,8 @@ read_next_file_entry(FileHdl, Offset) -> {false, false} -> %% all good, let's continue case file:read(FileHdl, MsgIdBinSize) of {ok, <<MsgId:MsgIdBinSize/binary>>} -> - ExpectedAbsPos = Offset + TwoIntegers + TotalSize, + ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + + TotalSize - 1, case file:position(FileHdl, {cur, TotalSize - MsgIdBinSize} ) of |