summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-21 15:14:59 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-21 15:14:59 +0100
commit77d688e5fddb1daa4374f42698770e6737e10ea0 (patch)
tree1d108bd2b08cc32505c30956da62a806b4165488
parentc3c3e7fad74142ba446acb990c7b303ec9f71e48 (diff)
downloadrabbitmq-server-77d688e5fddb1daa4374f42698770e6737e10ea0.tar.gz
cosmetic -> with_queue_head => queue_head
Also, time for a new optimisation! YAY! Previously, reading a message off disk meant seeking to the correct position and then reading the data. Now if the handle is already in the right position, then that seek is a waste of quite a lot of time, as it is an OS call. Now, I cache the location of the handle and so avoid seeking when possible. This has a MASSIVE effect on performance, especially in straight line cases, eg where a single prefetcher can drain a queue of disk in about one third of the time it used to take. Just looking at the code coverage from the test suite, there were just 534 seeks and 8582 cases where we found the handle in the right position already. This is a fairly small amount of code, and provides very useful benefits.
-rw-r--r--src/rabbit_disk_queue.erl56
1 files changed, 33 insertions, 23 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4b8759f8..d19469d6 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -754,7 +754,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
-get_read_handle(File, Offset, State =
+get_read_handle(File, Offset, TotalSize, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
@@ -766,7 +766,8 @@ get_read_handle(File, Offset, State =
true -> State
end,
Now = now(),
- {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} =
case dict:find(File, ReadHdls) of
error ->
{ok, Hdl} = file:open(form_filename(File),
@@ -774,21 +775,21 @@ get_read_handle(File, Offset, State =
read_ahead]),
case dict:size(ReadHdls) < ReadFileHandlesLimit of
true ->
- {Hdl, ReadHdls, ReadHdlsAge};
- _False ->
+ {Hdl, 0, ReadHdls, ReadHdlsAge};
+ false ->
{Then, OldFile, ReadHdlsAge2} =
gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, Then}} =
+ {ok, {OldHdl, _Offset, Then}} =
dict:find(OldFile, ReadHdls),
ok = file:close(OldHdl),
- {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
end;
- {ok, {Hdl, Then}} ->
- {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ {ok, {Hdl, OldOffset1, Then}} ->
+ {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
end,
- ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1),
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl,
+ {FileHdl, Offset /= OldOffset,
State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}.
sequence_lookup(Sequences, Q) ->
@@ -874,7 +875,7 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
internal_fetch_body(Q, MarkDelivered, Advance, State) ->
- case with_queue_head(Q, MarkDelivered, Advance, State) of
+ case queue_head(Q, MarkDelivered, Advance, State) of
E = {ok, empty, _} -> E;
{ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} ->
{Message, State2} = read_stored_message(StoreEntry, State1),
@@ -882,7 +883,7 @@ internal_fetch_body(Q, MarkDelivered, Advance, State) ->
end.
internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
- case with_queue_head(Q, MarkDelivered, Advance, State) of
+ case queue_head(Q, MarkDelivered, Advance, State) of
E = {ok, empty, _} -> E;
{ok, AckTag, IsDelivered,
#message_store_entry { msg_id = MsgId, is_persistent = IsPersistent },
@@ -890,8 +891,8 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
{ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1}
end.
-with_queue_head(Q, MarkDelivered, Advance,
- State = #dqstate { sequences = Sequences }) ->
+queue_head(Q, MarkDelivered, Advance,
+ State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
{ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
@@ -913,9 +914,10 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
total_size = TotalSize }, State) ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {FileHdl, State1} = get_read_handle(File, Offset, State),
+ {FileHdl, SeekReq, State1} =
+ get_read_handle(File, Offset, TotalSize, State),
{ok, {MsgBody, _IsPersistent, EncodedBodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
+ read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
insert_into_cache(Message, EncodedBodySize, State1);
@@ -1480,7 +1482,7 @@ close_file(File, State = #dqstate { read_file_handles =
case dict:find(File, ReadHdls) of
error ->
State;
- {ok, {Hdl, Then}} ->
+ {ok, {Hdl, _Offset, Then}} ->
ok = file:close(Hdl),
State #dqstate { read_file_handles =
{ dict:erase(File, ReadHdls),
@@ -1867,10 +1869,17 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
KO -> KO
end.
-read_message_at_offset(FileHdl, Offset, TotalSize) ->
+read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) ->
TotalSizeWriteOkBytes = TotalSize + 1,
- case file:position(FileHdl, {bof, Offset}) of
- {ok, Offset} ->
+ SeekRes = case SeekReq of
+ true -> case file:position(FileHdl, {bof, Offset}) of
+ {ok, Offset} -> ok;
+ KO -> KO
+ end;
+ false -> ok
+ end,
+ case SeekRes of
+ ok ->
case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
{ok, <<TotalSize:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
@@ -1884,9 +1893,9 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
{ok, {MsgBody, true, BodySize}}
end;
- KO -> KO
+ KO1 -> KO1
end;
- KO -> KO
+ KO2 -> KO2
end.
scan_file_for_valid_messages(File) ->
@@ -1931,7 +1940,8 @@ read_next_file_entry(FileHdl, Offset) ->
{false, false} -> %% all good, let's continue
case file:read(FileHdl, MsgIdBinSize) of
{ok, <<MsgId:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
+ ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT +
+ TotalSize - 1,
case file:position(FileHdl,
{cur, TotalSize - MsgIdBinSize}
) of