diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-08-17 12:36:43 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-08-17 12:36:43 +0100 |
commit | 1a7940bbc28618ed34ed97eda70d6573631dd98d (patch) | |
tree | a0ff35d46f79a2ebaf63f013a533298d5f8c2b8d | |
parent | 9ad783044191029d51600a8e1d81d04437329c7a (diff) | |
parent | efde7ce70136bf56042293f23549756bfd8a59bc (diff) | |
download | rabbitmq-server-1a7940bbc28618ed34ed97eda70d6573631dd98d.tar.gz |
merge bug24340 into default
-rw-r--r-- | src/file_handle_cache.erl | 37 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 54 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 79 |
3 files changed, 65 insertions, 105 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 235e14c0..3c2111dc 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -44,7 +44,6 @@ %% 4) You can find out what your 'real' offset is, and what your %% 'virtual' offset is (i.e. where the hdl really is, and where it %% would be after the write buffer is written out). -%% 5) You can find out what the offset was when you last sync'd. %% %% There is also a server component which serves to limit the number %% of open file descriptors. This is a hard limit: the server @@ -144,8 +143,8 @@ -export([register_callback/3]). -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, - flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). + current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, + set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -172,7 +171,6 @@ -record(handle, { hdl, offset, - trusted_offset, is_dirty, write_buffer_size, write_buffer_size_limit, @@ -240,7 +238,6 @@ -spec(sync/1 :: (ref()) -> ok_or_error()). -spec(position/2 :: (ref(), position()) -> val_or_error(offset())). -spec(truncate/1 :: (ref()) -> ok_or_error()). --spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())). -spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())). -spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). -spec(flush/1 :: (ref()) -> ok_or_error()). @@ -365,11 +362,10 @@ sync(Ref) -> [Ref], fun ([#handle { is_dirty = false, write_buffer = [] }]) -> ok; - ([Handle = #handle { hdl = Hdl, offset = Offset, + ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> case file:sync(Hdl) of - ok -> {ok, [Handle #handle { trusted_offset = Offset, - is_dirty = false }]}; + ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end end). @@ -384,21 +380,13 @@ position(Ref, NewOffset) -> truncate(Ref) -> with_flushed_handles( [Ref], - fun ([Handle1 = #handle { hdl = Hdl, offset = Offset, - trusted_offset = TOffset }]) -> + fun ([Handle1 = #handle { hdl = Hdl }]) -> case file:truncate(Hdl) of - ok -> TOffset1 = lists:min([Offset, TOffset]), - {ok, [Handle1 #handle { trusted_offset = TOffset1, - at_eof = true }]}; + ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end end). -last_sync_offset(Ref) -> - with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) -> - {ok, TOffset} - end). - current_virtual_offset(Ref) -> with_handles([Ref], fun ([#handle { at_eof = true, is_write = true, offset = Offset, @@ -456,8 +444,7 @@ clear(Ref) -> write_buffer_size = 0 }) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case file:truncate(Hdl) of - ok -> {ok, [Handle1 #handle {trusted_offset = 0, - at_eof = true }]}; + ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end; {{error, _} = Error, Handle1} -> @@ -585,14 +572,13 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, end) of {ok, Hdl} -> Now = now(), - {{ok, Offset1}, Handle1} = + {{ok, _Offset}, Handle1} = maybe_seek(Offset, Handle #handle { hdl = Hdl, offset = 0, last_used_at = Now }), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), + put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), - [{Ref, Handle2} | RefHdls]); + [{Ref, Handle1} | RefHdls]); Error -> %% NB: none of the handles in ToOpen are in the age tree Oldest = oldest(Tree, fun () -> undefined end), @@ -677,7 +663,6 @@ new_closed_handle(Path, Mode, Options) -> Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = closed, offset = 0, - trusted_offset = 0, is_dirty = false, write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, @@ -705,7 +690,6 @@ soft_close(Handle = #handle { hdl = closed }) -> soft_close(Handle) -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, - offset = Offset, is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of @@ -715,7 +699,6 @@ soft_close(Handle) -> ok = file:close(Hdl), age_tree_delete(Then), {ok, Handle1 #handle { hdl = closed, - trusted_offset = Offset, is_dirty = false, last_used_at = undefined }}; {_Error, _Handle} = Result -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2be2e883..17d5f64b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2, sync/3]). + write/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -60,7 +60,6 @@ current_file, %% current file name as number current_file_handle, %% current file handle since the last fsync? file_handle_cache, %% file handle cache - on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes @@ -152,8 +151,6 @@ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). --spec(sync/3 :: - ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). @@ -442,7 +439,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). -sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -639,7 +635,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> current_file = 0, current_file_handle = undefined, file_handle_cache = dict:new(), - on_sync = [], sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, @@ -761,21 +756,6 @@ handle_cast({remove, CRef, MsgIds}, State) -> noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), removed, State1))); -handle_cast({sync, MsgIds, K}, - State = #msstate { current_file = CurFile, - current_file_handle = CurHdl, - on_sync = Syncs }) -> - {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any(fun (MsgId) -> - #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), - File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds) of - false -> K(), - noreply(State); - true -> noreply(State #msstate { on_sync = [K | Syncs] }) - end; - handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, @@ -854,17 +834,15 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { sync_timer_ref = undefined, - on_sync = Syncs, cref_to_msg_ids = CTM }) -> - case {Syncs, dict:size(CTM)} of - {[], 0} -> {State, hibernate}; - _ -> {start_sync_timer(State), 0} + case dict:size(CTM) of + 0 -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} end; -next_state(State = #msstate { on_sync = Syncs, - cref_to_msg_ids = CTM }) -> - case {Syncs, dict:size(CTM)} of - {[], 0} -> {stop_sync_timer(State), hibernate}; - _ -> {State, 0} +next_state(State = #msstate { cref_to_msg_ids = CTM }) -> + case dict:size(CTM) of + 0 -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> @@ -878,7 +856,6 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), CGs = dict:fold(fun (CRef, MsgIds, NS) -> @@ -887,16 +864,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, MsgIds} | NS] end end, [], CTM), - ok = case {Syncs, CGs} of - {[], []} -> ok; - _ -> file_handle_cache:sync(CurHdl) + ok = case CGs of + [] -> ok; + _ -> file_handle_cache:sync(CurHdl) end, - [K() || K <- lists:reverse(Syncs)], - State2 = lists:foldl( - fun ({CRef, MsgIds}, StateN) -> - client_confirm(CRef, MsgIds, written, StateN) - end, State1, CGs), - State2 #msstate { on_sync = [] }. + lists:foldl(fun ({CRef, MsgIds}, StateN) -> + client_confirm(CRef, MsgIds, written, StateN) + end, State1, CGs). write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f36b21ca..a068efe5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1824,25 +1824,49 @@ msg_id_bin(X) -> msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). +on_disk_capture() -> + on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). +on_disk_capture({OnDisk, Awaiting, Pid}) -> + Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of + true -> Pid ! {self(), arrived}, undefined; + false -> Pid + end, + receive + {await, MsgIds, Pid2} -> + true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting), + on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2}); + {on_disk, MsgIds} -> + on_disk_capture({gb_sets:union(OnDisk, MsgIds), + gb_sets:subtract(Awaiting, MsgIds), + Pid1}); + stop -> + done + end. + +on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> + Pid ! {await, gb_sets:from_list(MsgIds), self()}, + receive {Pid, arrived} -> ok end. + +on_disk_stop(Pid) -> + MRef = erlang:monitor(process, Pid), + Pid ! stop, + receive {'DOWN', MRef, process, Pid, _Reason} -> + ok + end. + +msg_store_client_init_capture(MsgStore, Ref) -> + Pid = spawn(fun on_disk_capture/0), + {Pid, rabbit_msg_store:client_init( + MsgStore, Ref, fun (MsgIds, _ActionTaken) -> + Pid ! {on_disk, MsgIds} + end, undefined)}. + msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( fun (MsgId, Atom1) when Atom1 =:= Atom -> rabbit_msg_store:contains(MsgId, MSCState) end, Atom, MsgIds). -msg_store_sync(MsgIds, MSCState) -> - Ref = make_ref(), - Self = self(), - ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end, - MSCState), - receive - {sync, Ref} -> ok - after - 10000 -> - io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]), - throw(timeout) - end. - msg_store_read(MsgIds, MSCState) -> lists:foldl(fun (MsgId, MSCStateM) -> {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( @@ -1876,22 +1900,18 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> test_msg_store() -> restart_msg_store_empty(), - Self = self(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), Ref = rabbit_guid:guid(), - MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(MsgIds1stHalf, MSCState), + ok = on_disk_await(Cap, MsgIds1stHalf), %% publish the second half ok = msg_store_write(MsgIds2ndHalf, MSCState), - %% sync on the first half again - the msg_store will be dirty, but - %% we won't need the fsync - ok = msg_store_sync(MsgIds1stHalf, MSCState), %% check they're all in there true = msg_store_contains(true, MsgIds, MSCState), %% publish the latter half twice so we hit the caching and ref count code @@ -1900,25 +1920,8 @@ test_msg_store() -> true = msg_store_contains(true, MsgIds, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen - ok = lists:foldl( - fun (MsgId, ok) -> rabbit_msg_store:sync( - [MsgId], fun () -> Self ! {sync, MsgId} end, - MSCState) - end, ok, MsgIds2ndHalf), - lists:foldl( - fun(MsgId, ok) -> - receive - {sync, MsgId} -> ok - after - 10000 -> - io:format("Sync from msg_store missing (msg_id: ~p)~n", - [MsgId]), - throw(timeout) - end - end, ok, MsgIds2ndHalf), - %% it's very likely we're not dirty here, so the 1st half sync - %% should hit a different code path - ok = msg_store_sync(MsgIds1stHalf, MSCState), + ok = on_disk_await(Cap, MsgIds2ndHalf), + ok = on_disk_stop(Cap), %% read them all MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk |