diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-16 13:24:53 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-16 13:24:53 +0100 |
commit | ed5237aa32212f894af50e6bb91e9aabe52c285c (patch) | |
tree | 82269d5c8ecec392bdcb3af8e0255e24ac14abd1 | |
parent | d7e03b29549f450f391e5a4aaab287f6cd871a48 (diff) | |
download | rabbitmq-server-ed5237aa32212f894af50e6bb91e9aabe52c285c.tar.gz |
Correct capture of msg-on-disk in tests as sync replacement; rip out explicit synchronous msg_store:sync and other unneeded logic
-rw-r--r-- | src/rabbit_msg_store.erl | 62 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 28 |
2 files changed, 27 insertions, 63 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 21a499c5..f86f90cc 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2, sync/3]). + write/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -60,7 +60,6 @@ current_file, %% current file name as number current_file_handle, %% current file handle since the last fsync? file_handle_cache, %% file handle cache - on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes @@ -151,8 +150,6 @@ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). --spec(sync/3 :: - ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). @@ -441,8 +438,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). -sync(MsgIds, K, CState = #client_msstate { client_ref = CRef }) -> - server_cast(CState, {sync, CRef, MsgIds, K}). set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -639,7 +634,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> current_file = 0, current_file_handle = undefined, file_handle_cache = dict:new(), - on_sync = [], sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, @@ -761,28 +755,6 @@ handle_cast({remove, CRef, MsgIds}, State) -> noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), removed, State1))); -handle_cast({sync, CRef, MsgIds, K}, - State = #msstate { current_file = CurFile, - current_file_handle = CurHdl, - on_sync = Syncs, - dying_clients = DyingClients }) -> - case sets:is_element(CRef, DyingClients) of - true -> - noreply(State); - false -> - {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any( - fun (MsgId) -> - #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), - File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds) of - false -> K(), - noreply(State); - true -> noreply(State #msstate { on_sync = [K | Syncs] }) - end - end; - handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, @@ -861,17 +833,15 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { sync_timer_ref = undefined, - on_sync = Syncs, cref_to_msg_ids = CTM }) -> - case {Syncs, dict:size(CTM)} of - {[], 0} -> {State, hibernate}; - _ -> {start_sync_timer(State), 0} + case dict:size(CTM) of + 0 -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} end; -next_state(State = #msstate { on_sync = Syncs, - cref_to_msg_ids = CTM }) -> - case {Syncs, dict:size(CTM)} of - {[], 0} -> {stop_sync_timer(State), hibernate}; - _ -> {State, 0} +next_state(State = #msstate { cref_to_msg_ids = CTM }) -> + case dict:size(CTM) of + 0 -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> @@ -885,7 +855,6 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), CGs = dict:fold(fun (CRef, MsgIds, NS) -> @@ -894,16 +863,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, MsgIds} | NS] end end, [], CTM), - ok = case {Syncs, CGs} of - {[], []} -> ok; - _ -> file_handle_cache:sync(CurHdl) + ok = case CGs of + [] -> ok; + _ -> file_handle_cache:sync(CurHdl) end, - [K() || K <- lists:reverse(Syncs)], - State2 = lists:foldl( - fun ({CRef, MsgIds}, StateN) -> - client_confirm(CRef, MsgIds, written, StateN) - end, State1, CGs), - State2 #msstate { on_sync = [] }. + lists:foldl(fun ({CRef, MsgIds}, StateN) -> + client_confirm(CRef, MsgIds, written, StateN) + end, State1, CGs). write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 10fdd75c..90d0b583 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1768,23 +1768,21 @@ msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). on_disk_capture() -> - on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). + on_disk_capture({gb_sets:new(), undefined, undefined}). on_disk_capture({OnDisk, Awaiting, Pid}) -> receive - {on_disk, MsgIds} when Awaiting =/= undefined -> - Awaiting1 = gb_sets:subtract(Awaiting, MsgIds), - OnDisk1 = gb_sets:subtract(gb_sets:union(OnDisk, MsgIds), Awaiting), - case (not gb_sets:is_empty(Awaiting)) - andalso gb_sets:is_empty(Awaiting1) of - true -> Pid ! {self(), arrived}, - on_disk_capture({OnDisk1, undefined, undefined}); - false -> on_disk_capture({OnDisk1, Awaiting1, Pid}) + {await, MsgIds, Pid1} when Awaiting =:= undefined -> + Awaiting1 = gb_sets:subtract(MsgIds, OnDisk), + case gb_sets:is_empty(Awaiting1) of + true -> Pid1 ! {self(), arrived}, + on_disk_capture({OnDisk, undefined, undefined}); + false -> on_disk_capture({OnDisk, Awaiting1, Pid1}) end; - {on_disk, MsgIds} -> + {on_disk, MsgIds} when Awaiting =:= undefined -> on_disk_capture({gb_sets:union(OnDisk, MsgIds), Awaiting, Pid}); - {await, MsgIds, Pid} when Awaiting =/= undefined -> - OnDisk1 = gb_sets:subtract(OnDisk, MsgIds), - Awaiting1 = gb_sets:subtract(MsgIds, OnDisk), + {on_disk, MsgIds} -> + OnDisk1 = gb_sets:union(OnDisk, MsgIds), + Awaiting1 = gb_sets:subtract(Awaiting, MsgIds), case gb_sets:is_empty(Awaiting1) of true -> Pid ! {self(), arrived}, on_disk_capture({OnDisk1, undefined, undefined}); @@ -1794,8 +1792,8 @@ on_disk_capture({OnDisk, Awaiting, Pid}) -> done end. -on_disk_await(Pid, MsgIds) -> - Pid ! {await, MsgIds, self()}, +on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> + Pid ! {await, gb_sets:from_list(MsgIds), self()}, receive {Pid, arrived} -> ok end. on_disk_stop(Pid) -> |