summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-18 13:27:42 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-18 13:27:42 +0100
commit61405ee6e1a2a02189f77ceddebfc471d917956c (patch)
treeded5d760a4fba08275f573c5ed10ae0129ea4360
parentdcc60acba1f22da0497534f5227677c7cb4b8228 (diff)
downloadrabbitmq-server-61405ee6e1a2a02189f77ceddebfc471d917956c.tar.gz
well, I think it works, but it's now much much slower.
-rw-r--r--src/rabbit_disk_queue.erl171
1 files changed, 100 insertions, 71 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 3370ef84..1e2226bb 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -44,7 +44,7 @@
dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1
]).
--export([length/1]).
+-export([length/1, filesync/0]).
-export([stop/0, stop_and_obliterate/0,
to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -68,6 +68,8 @@
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(SYNC_INTERVAL, 5). %% milliseconds
+
-record(dqstate,
{msg_location_dets, %% where are messages?
msg_location_ets, %% as above, but for ets version
@@ -82,7 +84,9 @@
%% since the last fsync?
file_size_limit, %% how big can our files get?
read_file_handles, %% file handles for reading (LRU)
- read_file_handles_limit %% how many file handles can we open?
+ read_file_handles_limit, %% how many file handles can we open?
+ on_sync_functions, %% list of functions to run on sync (reversed)
+ timer_ref %% TRef for our interval timer
}).
%% The components:
@@ -260,6 +264,7 @@
-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(filesync/0 :: () -> 'ok').
-endif.
@@ -322,14 +327,17 @@ stop_and_obliterate() ->
gen_server2:call(?SERVER, stop_vaporise, infinity).
to_disk_only_mode() ->
- gen_server2:pcall(?SERVER, 10, to_disk_only_mode, infinity).
+ gen_server2:pcall(?SERVER, 9, to_disk_only_mode, infinity).
to_ram_disk_mode() ->
- gen_server2:pcall(?SERVER, 10, to_ram_disk_mode, infinity).
+ gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity).
length(Q) ->
gen_server2:call(?SERVER, {length, Q}, infinity).
+filesync() ->
+ gen_server2:pcast(?SERVER, 10, filesync).
+
%% ---- GEN-SERVER INTERNAL API ----
init([FileSizeLimit, ReadFileHandlesLimit]) ->
@@ -368,6 +376,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+ {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []),
+
InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
@@ -384,7 +394,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
current_dirty = false,
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
- read_file_handles_limit = ReadFileHandlesLimit
+ read_file_handles_limit = ReadFileHandlesLimit,
+ on_sync_functions = [],
+ timer_ref = TRef
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -406,20 +418,20 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
handle_call({publish, Q, MsgId, MsgBody}, _From, State) ->
{ok, MsgSeqId, State1} =
internal_publish(Q, MsgId, next, MsgBody, true, State),
- {reply, MsgSeqId, State1};
+ reply(MsgSeqId, State1);
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
- {reply, Result, State1};
+ reply(Result, State1);
handle_call({phantom_deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, false, false, State),
- {reply, Result, State1};
-handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
+ reply(Result, State1);
+handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}),
- {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State),
- {reply, ok, State1};
+ {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State),
+ noreply(State1);
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
- {reply, Count, State1};
+ reply(Count, State1);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -436,7 +448,7 @@ handle_call(stop_vaporise, _From, State) ->
%% gen_server now calls terminate, which then calls shutdown
handle_call(to_disk_only_mode, _From,
State = #dqstate { operation_mode = disk_only }) ->
- {reply, ok, State};
+ reply(ok, State);
handle_call(to_disk_only_mode, _From,
State = #dqstate { operation_mode = ram_disk,
msg_location_dets = MsgLocationDets,
@@ -446,10 +458,10 @@ handle_call(to_disk_only_mode, _From,
disc_only_copies),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
true = ets:delete_all_objects(MsgLocationEts),
- {reply, ok, State #dqstate { operation_mode = disk_only }};
+ reply(ok, State #dqstate { operation_mode = disk_only });
handle_call(to_ram_disk_mode, _From,
State = #dqstate { operation_mode = ram_disk }) ->
- {reply, ok, State};
+ reply(ok, State);
handle_call(to_ram_disk_mode, _From,
State = #dqstate { operation_mode = disk_only,
msg_location_dets = MsgLocationDets,
@@ -459,46 +471,50 @@ handle_call(to_ram_disk_mode, _From,
disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
- {reply, ok, State #dqstate { operation_mode = ram_disk }};
+ reply(ok, State #dqstate { operation_mode = ram_disk });
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q),
- {reply, Length, State};
+ reply(Length, State);
handle_call({dump_queue, Q}, _From, State) ->
{Result, State1} = internal_dump_queue(Q, State),
- {reply, Result, State1};
+ reply(Result, State1);
handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
- {reply, ok, State1}.
+ reply(ok, State1).
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, _MsgSeqId, State1} =
internal_publish(Q, MsgId, next, MsgBody, false, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({auto_ack_next_message, Q}, State) ->
{ok, State1} = internal_auto_ack(Q, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({requeue, Q, MsgSeqIds}, State) ->
MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, {next, true}}),
{ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
{ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
- {noreply, State1};
+ noreply(State1);
handle_cast({delete_queue, Q}, State) ->
{ok, State1} = internal_delete_queue(Q, State),
- {noreply, State1}.
+ noreply(State1);
+handle_cast(filesync, State) ->
+ noreply(sync_current_file_handle(State)).
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info(timeout, State = #dqstate { current_dirty = true }) ->
+ noreply(sync_current_file_handle(State));
handle_info(_Info, State) ->
{noreply, State}.
@@ -508,16 +524,18 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge}
+ read_file_handles = {ReadHdls, _ReadHdlsAge},
+ timer_ref = TRef
}) ->
%% deliberately ignoring return codes here
+ timer:cancel(TRef),
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
true = ets:delete_all_objects(MsgLocationEts),
case FileHdl of
undefined -> ok;
- _ -> file:sync(FileHdl),
+ _ -> sync_current_file_handle(State),
file:close(FileHdl)
end,
dict:fold(fun (_File, Hdl, _Acc) ->
@@ -525,13 +543,25 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
end, ok, ReadHdls),
State #dqstate { current_file_handle = undefined,
current_dirty = false,
- read_file_handles = {dict:new(), gb_trees:empty()}}.
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ timer_ref = undefined
+ }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ---- UTILITY FUNCTIONS ----
+noreply(NewState = #dqstate { current_dirty = true }) ->
+ {noreply, NewState, 0};
+noreply(NewState) ->
+ {noreply, NewState, infinity}.
+
+reply(Reply, NewState = #dqstate { current_dirty = true }) ->
+ {reply, Reply, NewState, 0};
+reply(Reply, NewState) ->
+ {reply, Reply, NewState, infinity}.
+
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -613,14 +643,12 @@ get_read_handle(File, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
- current_file_handle = CurHdl,
current_dirty = IsDirty
}) ->
- IsDirty1 = if CurName =:= File andalso IsDirty ->
- file:sync(CurHdl),
- false;
- true -> IsDirty
- end,
+ State1 = if CurName =:= File andalso IsDirty ->
+ sync_current_file_handle(State);
+ true -> State
+ end,
Now = now(),
{FileHdl, ReadHdls1, ReadHdlsAge1} =
case dict:find(File, ReadHdls) of
@@ -644,9 +672,8 @@ get_read_handle(File, State =
end,
ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl, State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3},
- current_dirty = IsDirty1
- }}.
+ {FileHdl,
+ State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}.
adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) ->
ExpectedSeqId;
@@ -676,6 +703,17 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
+sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
+ current_dirty = IsDirty,
+ on_sync_functions = Funcs
+ }) ->
+ ok = case IsDirty of
+ true -> file:sync(CurHdl);
+ false -> ok
+ end,
+ lists:map(fun (Fun) -> Fun() end, lists:reverse(Funcs)),
+ State #dqstate { current_dirty = false, on_sync_functions = [] }.
+
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, ReadMsg, FakeDeliver,
@@ -818,12 +856,10 @@ internal_tx_publish(MsgId, MsgBody,
end.
%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
-internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
- State = #dqstate { current_file_handle = CurHdl,
- current_file_name = CurName,
- current_dirty = IsDirty,
- sequences = Sequences
- }) ->
+internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
+ State = #dqstate { sequences = Sequences,
+ on_sync_functions = SyncFuncs
+ }) ->
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
[] -> {[], undefined, undefined, undefined};
@@ -835,7 +871,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{ zip_with_tail(PubMsgSeqIds, {last, {next, next}}),
InitWriteSeqId, InitReadSeqId1, InitLength}
end,
- {atomic, {Sync, WriteSeqId, State1}} =
+ {atomic, {WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
@@ -844,11 +880,11 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
%% it's been published, which is clearly
%% nonsense. I.e. in commit, do not do things in an
%% order which _could_not_ have happened.
- {Sync1, WriteSeqId1} =
+ WriteSeqId1 =
lists:foldl(
fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
- {Acc, ExpectedSeqId}) ->
- [{MsgId, _RefCount, File, _Offset,
+ ExpectedSeqId) ->
+ [{MsgId, _RefCount, _File, _Offset,
_TotalSize}] = dets_ets_lookup(State, MsgId),
SeqId1 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
@@ -863,23 +899,21 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
next_seq_id = NextSeqId1
},
write),
- {Acc orelse (CurName =:= File), NextSeqId1}
- end, {false, PubAcc}, PubList),
-
+ NextSeqId1
+ end, PubAcc, PubList),
{ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {Sync1, WriteSeqId1, State2}
+ {WriteSeqId1, State2}
end),
true = case PubList of
[] -> true;
_ -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId,
Length + erlang:length(PubList)})
end,
- IsDirty1 = if IsDirty andalso Sync ->
- ok = file:sync(CurHdl),
- false;
- true -> IsDirty
- end,
- {ok, State1 #dqstate { current_dirty = IsDirty1 }}.
+ {ok,
+ State1 #dqstate { on_sync_functions = [fun() ->
+ gen_server2:reply(From, ok)
+ end | SyncFuncs]}
+ }.
%% SeqId can be 'next'
internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->
@@ -1051,14 +1085,10 @@ maybe_roll_to_new_file(Offset,
current_file_name = CurName,
current_file_handle = CurHdl,
current_file_num = CurNum,
- current_dirty = IsDirty,
file_summary = FileSummary
}
) when Offset >= FileSizeLimit ->
- ok = case IsDirty of
- true -> file:sync(CurHdl);
- false -> ok
- end,
+ State1 = sync_current_file_handle(State),
ok = file:close(CurHdl),
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
@@ -1067,13 +1097,12 @@ maybe_roll_to_new_file(Offset,
ok = preallocate(NextHdl, FileSizeLimit, 0),
true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right
true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
- State1 = State #dqstate { current_file_name = NextName,
- current_file_handle = NextHdl,
- current_file_num = NextNum,
- current_offset = 0,
- current_dirty = false
- },
- {ok, compact(sets:from_list([CurName]), State1)};
+ State2 = State1 #dqstate { current_file_name = NextName,
+ current_file_handle = NextHdl,
+ current_file_num = NextNum,
+ current_offset = 0
+ },
+ {ok, compact(sets:from_list([CurName]), State2)};
maybe_roll_to_new_file(_, State) ->
{ok, State}.