summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-10 13:24:25 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-10 13:24:25 +0100
commitbd2f0801eb9754cd3b2de65ed5d82badfdcbd83f (patch)
tree160b29d10ebe625c3490e7ce7997a4e72bca2536
parent0bfe731400873c59abb1ccdb121d526e848dfbb8 (diff)
downloadrabbitmq-server-bd2f0801eb9754cd3b2de65ed5d82badfdcbd83f.tar.gz
prefetch part 1. When a mixed_queue sees that the next item in its queue is on disk, it issues a low priority prefetch instruction to the disk queue, which populates the disk_queue's cache. Note that this shouldn't impact on memory as by virtue of the mixed_queue being in mixed mode, the contents of the queue are already accounted for in memory even though they were on disk. The effect of this is that when the deliver comes, it doesn't need to go to disk to read the message as the messages are already in cache. Testing:
A 100,000 * 1Kb msg queue takes 15 seconds to drain (basic.get, noack) when the messages are in memory, in the mixed queue. On disk, without prefetch, takes 32 seconds On disk, with prefetch, cache hot, takes 25 seconds. The next step is to get the disk queue to signal back to the queue that the prefetch is done and for the queue to grab the messages from the disk_queue in advance, thus meaning that on delivery, all that is needed is the async acks being sent to the disk_queue (assuming the messages are not actually persistent).
-rw-r--r--src/gen_server2.erl5
-rw-r--r--src/rabbit_disk_queue.erl105
-rw-r--r--src/rabbit_mixed_queue.erl15
3 files changed, 85 insertions, 40 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 87b56ba3..cf54811f 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -508,8 +508,9 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue,
process_msg(Parent, Name, State, Mod,
Time, TimeoutState, Queue1, Debug,
case Time == hibernate of
- true -> roused_and_disinterested;
- false -> timeout
+ true -> {roused_and_disinterested, MinPri};
+ false when MinPri =:= any -> timeout;
+ false -> {timeout, MinPri}
end)
end
end.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 813ab7c4..3a520ecd 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2
+ requeue_next_n/2, prefetch/2
]).
-export([filesync/0, cache_info/0]).
@@ -345,6 +345,9 @@ report_memory() ->
set_mode(Mode) ->
gen_server2:cast(?SERVER, {set_mode, Mode}).
+prefetch(Q, Count) ->
+ gen_server2:pcast(?SERVER, -1, {prefetch, Q, Count}).
+
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -507,21 +510,28 @@ handle_cast({set_mode, Mode}, State) ->
mixed -> fun to_ram_disk_mode/1
end)(State));
handle_cast(report_memory, State) ->
- %% call noreply1/1, not noreply/1, as we don't want to restart the
+ %% call noreply1/2, not noreply/1/2, as we don't want to restart the
%% memory_report_timer
%% by unsetting the timer, we force a report on the next normal message
- noreply1(State #dqstate { memory_report_timer = undefined }).
+ noreply1(State #dqstate { memory_report_timer = undefined }, 0);
+handle_cast({prefetch, Q, Count}, State) ->
+ {ok, State1} = internal_prefetch(Q, Count, State),
+ noreply(State1, any). %% set minpri to any
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) ->
- ok = report_memory(true, State),
- %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
- {noreply, stop_memory_timer(State), hibernate, 0};
-handle_info(timeout, State) ->
+handle_info({timeout, 0}, State = #dqstate { commit_timer_ref = undefined }) ->
+ %% this is the binary timeout coming back, with minpri = 0
+ %% don't use noreply/1/2 or noreply1/2 as they'll restart the memory timer
+ %% set timeout to 0, and go pick up any low priority messages
+ {noreply, stop_memory_timer(State), 0, any};
+handle_info({timeout, 0}, State) ->
+ %% must have commit_timer set, so timeout was 0, and we're not hibernating
noreply(sync_current_file_handle(State));
-handle_info(_Info, State) ->
- noreply(State).
+handle_info(timeout, State) ->
+ %% no minpri supplied, so it must have been 'any', so go hibernate
+ ok = report_memory(true, State),
+ {noreply, State, hibernate, any}.
terminate(_Reason, State) ->
shutdown(State).
@@ -643,30 +653,36 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
ets_bytes_per_record = undefined }.
noreply(NewState) ->
- noreply1(start_memory_timer(NewState)).
+ noreply(NewState, 0).
+
+noreply(NewState, MinPri) ->
+ noreply1(start_memory_timer(NewState), MinPri).
noreply1(NewState = #dqstate { on_sync_froms = [],
- commit_timer_ref = undefined }) ->
- {noreply, NewState, binary, 0};
-noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
- {noreply, start_commit_timer(NewState), 0, 0};
-noreply1(NewState = #dqstate { on_sync_froms = [] }) ->
- {noreply, stop_commit_timer(NewState), binary, 0};
-noreply1(NewState) ->
- {noreply, NewState, 0, 0}.
+ commit_timer_ref = undefined }, MinPri) ->
+ {noreply, NewState, binary, MinPri};
+noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
+ {noreply, start_commit_timer(NewState), 0, MinPri};
+noreply1(NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+ {noreply, stop_commit_timer(NewState), binary, MinPri};
+noreply1(NewState, MinPri) ->
+ {noreply, NewState, 0, MinPri}.
reply(Reply, NewState) ->
- reply1(Reply, start_memory_timer(NewState)).
+ reply(Reply, NewState, 0).
+
+reply(Reply, NewState, MinPri) ->
+ reply1(Reply, start_memory_timer(NewState), MinPri).
reply1(Reply, NewState = #dqstate { on_sync_froms = [],
- commit_timer_ref = undefined }) ->
- {reply, Reply, NewState, binary, 0};
-reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
- {reply, Reply, start_commit_timer(NewState), 0, 0};
-reply1(Reply, NewState = #dqstate { on_sync_froms = [] }) ->
- {reply, Reply, stop_commit_timer(NewState), binary, 0};
-reply1(Reply, NewState) ->
- {reply, Reply, NewState, 0, 0}.
+ commit_timer_ref = undefined }, MinPri) ->
+ {reply, Reply, NewState, binary, MinPri};
+reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
+ {reply, Reply, start_commit_timer(NewState), 0, MinPri};
+reply1(Reply, NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+ {reply, Reply, stop_commit_timer(NewState), binary, MinPri};
+reply1(Reply, NewState, MinPri) ->
+ {reply, Reply, NewState, 0, MinPri}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -829,8 +845,12 @@ decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
ok.
insert_into_cache(Message = #basic_message { guid = MsgId },
- MsgSize, #dqstate { message_cache = Cache }) ->
- true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}),
+ MsgSize, Forced, #dqstate { message_cache = Cache }) ->
+ Count = case Forced of
+ true -> 0;
+ false -> 1
+ end,
+ true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
ok.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -843,7 +863,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
Remaining = WriteSeqId - ReadSeqId - 1,
{ok, Result, State1} =
internal_read_message(
- Q, ReadSeqId, FakeDeliver, ReadMsg, State),
+ Q, ReadSeqId, ReadMsg, FakeDeliver, false, State),
true = ets:insert(Sequences,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
@@ -856,7 +876,20 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
end, State1}
end.
-internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
+internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ Length = WriteSeqId - ReadSeqId,
+ Count1 = lists:min([Length, Count]),
+ StateN =
+ lists:foldl(
+ fun(N, State1) ->
+ {ok, _MsgStuff, State2} =
+ internal_read_message(Q, N, true, true, true, State1),
+ State2
+ end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)),
+ {ok, StateN}.
+
+internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) ->
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
@@ -876,14 +909,14 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
Message = bin_to_msg(MsgBody),
- ok = case RefCount of
- 1 ->
+ ok = if RefCount > 1 orelse ForceInCache ->
+ insert_into_cache(Message, BodySize,
+ ForceInCache, State1);
+ true -> ok
%% it's not in the cache and we only
%% have 1 queue with the message. So
%% don't bother putting it in the
%% cache.
- ok;
- _ -> insert_into_cache(Message, BodySize, State1)
end,
{ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
State1};
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 61487c9d..2ef534ff 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -200,7 +200,8 @@ to_mixed_mode(TxnMessages, State =
%% don't actually do anything to the disk
MsgBuf = case Length of
0 -> queue:new();
- _ -> queue:from_list([{disk, Length}])
+ _ -> ok = rabbit_disk_queue:prefetch(Q, Length),
+ queue:from_list([{disk, Length}])
end,
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
@@ -341,6 +342,7 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
end;
false -> noack
end,
+ ok = maybe_prefetch(Q, MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
{disk, Rem1} ->
{Msg1 = #basic_message { is_persistent = IsPersistent },
@@ -353,7 +355,8 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
noack
end,
MsgBuf3 = case Rem1 of
- 1 -> MsgBuf1;
+ 1 -> ok = maybe_prefetch(Q, MsgBuf1),
+ MsgBuf1;
_ -> queue:in_r({disk, Rem1 - 1}, MsgBuf1)
end,
{Msg1, IsDelivered1, AckTag2, MsgBuf3}
@@ -362,6 +365,14 @@ deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
+maybe_prefetch(Q, MsgBuf) ->
+ case queue:peek(MsgBuf) of
+ empty -> ok;
+ {value, {disk, Count}} -> rabbit_disk_queue:prefetch(Q, Count);
+ {value, _} -> ok
+ end.
+
+
remove_noacks(MsgsWithAcks) ->
{AckTags, ASize} =
lists:foldl(