summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-16 13:24:53 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-16 13:24:53 +0100
commited5237aa32212f894af50e6bb91e9aabe52c285c (patch)
tree82269d5c8ecec392bdcb3af8e0255e24ac14abd1
parentd7e03b29549f450f391e5a4aaab287f6cd871a48 (diff)
downloadrabbitmq-server-ed5237aa32212f894af50e6bb91e9aabe52c285c.tar.gz
Correct capture of msg-on-disk in tests as sync replacement; rip out explicit synchronous msg_store:sync and other unneeded logic
-rw-r--r--src/rabbit_msg_store.erl62
-rw-r--r--src/rabbit_tests.erl28
2 files changed, 27 insertions, 63 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 21a499c5..f86f90cc 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, sync/3]).
+ write/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -60,7 +60,6 @@
current_file, %% current file name as number
current_file_handle, %% current file handle since the last fsync?
file_handle_cache, %% file handle cache
- on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
@@ -151,8 +150,6 @@
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
--spec(sync/3 ::
- ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
@@ -441,8 +438,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-sync(MsgIds, K, CState = #client_msstate { client_ref = CRef }) ->
- server_cast(CState, {sync, CRef, MsgIds, K}).
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -639,7 +634,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
current_file = 0,
current_file_handle = undefined,
file_handle_cache = dict:new(),
- on_sync = [],
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
@@ -761,28 +755,6 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({sync, CRef, MsgIds, K},
- State = #msstate { current_file = CurFile,
- current_file_handle = CurHdl,
- on_sync = Syncs,
- dying_clients = DyingClients }) ->
- case sets:is_element(CRef, DyingClients) of
- true ->
- noreply(State);
- false ->
- {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(
- fun (MsgId) ->
- #msg_location { file = File, offset = Offset } =
- index_lookup(MsgId, State),
- File =:= CurFile andalso Offset >= SyncOffset
- end, MsgIds) of
- false -> K(),
- noreply(State);
- true -> noreply(State #msstate { on_sync = [K | Syncs] })
- end
- end;
-
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
@@ -861,17 +833,15 @@ reply(Reply, State) ->
{reply, Reply, State1, Timeout}.
next_state(State = #msstate { sync_timer_ref = undefined,
- on_sync = Syncs,
cref_to_msg_ids = CTM }) ->
- case {Syncs, dict:size(CTM)} of
- {[], 0} -> {State, hibernate};
- _ -> {start_sync_timer(State), 0}
+ case dict:size(CTM) of
+ 0 -> {State, hibernate};
+ _ -> {start_sync_timer(State), 0}
end;
-next_state(State = #msstate { on_sync = Syncs,
- cref_to_msg_ids = CTM }) ->
- case {Syncs, dict:size(CTM)} of
- {[], 0} -> {stop_sync_timer(State), hibernate};
- _ -> {State, 0}
+next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
+ case dict:size(CTM) of
+ 0 -> {stop_sync_timer(State), hibernate};
+ _ -> {State, 0}
end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
@@ -885,7 +855,6 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
State #msstate { sync_timer_ref = undefined }.
internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs,
cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
CGs = dict:fold(fun (CRef, MsgIds, NS) ->
@@ -894,16 +863,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, MsgIds} | NS]
end
end, [], CTM),
- ok = case {Syncs, CGs} of
- {[], []} -> ok;
- _ -> file_handle_cache:sync(CurHdl)
+ ok = case CGs of
+ [] -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
end,
- [K() || K <- lists:reverse(Syncs)],
- State2 = lists:foldl(
- fun ({CRef, MsgIds}, StateN) ->
- client_confirm(CRef, MsgIds, written, StateN)
- end, State1, CGs),
- State2 #msstate { on_sync = [] }.
+ lists:foldl(fun ({CRef, MsgIds}, StateN) ->
+ client_confirm(CRef, MsgIds, written, StateN)
+ end, State1, CGs).
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 10fdd75c..90d0b583 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1768,23 +1768,21 @@ msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
on_disk_capture() ->
- on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}).
+ on_disk_capture({gb_sets:new(), undefined, undefined}).
on_disk_capture({OnDisk, Awaiting, Pid}) ->
receive
- {on_disk, MsgIds} when Awaiting =/= undefined ->
- Awaiting1 = gb_sets:subtract(Awaiting, MsgIds),
- OnDisk1 = gb_sets:subtract(gb_sets:union(OnDisk, MsgIds), Awaiting),
- case (not gb_sets:is_empty(Awaiting))
- andalso gb_sets:is_empty(Awaiting1) of
- true -> Pid ! {self(), arrived},
- on_disk_capture({OnDisk1, undefined, undefined});
- false -> on_disk_capture({OnDisk1, Awaiting1, Pid})
+ {await, MsgIds, Pid1} when Awaiting =:= undefined ->
+ Awaiting1 = gb_sets:subtract(MsgIds, OnDisk),
+ case gb_sets:is_empty(Awaiting1) of
+ true -> Pid1 ! {self(), arrived},
+ on_disk_capture({OnDisk, undefined, undefined});
+ false -> on_disk_capture({OnDisk, Awaiting1, Pid1})
end;
- {on_disk, MsgIds} ->
+ {on_disk, MsgIds} when Awaiting =:= undefined ->
on_disk_capture({gb_sets:union(OnDisk, MsgIds), Awaiting, Pid});
- {await, MsgIds, Pid} when Awaiting =/= undefined ->
- OnDisk1 = gb_sets:subtract(OnDisk, MsgIds),
- Awaiting1 = gb_sets:subtract(MsgIds, OnDisk),
+ {on_disk, MsgIds} ->
+ OnDisk1 = gb_sets:union(OnDisk, MsgIds),
+ Awaiting1 = gb_sets:subtract(Awaiting, MsgIds),
case gb_sets:is_empty(Awaiting1) of
true -> Pid ! {self(), arrived},
on_disk_capture({OnDisk1, undefined, undefined});
@@ -1794,8 +1792,8 @@ on_disk_capture({OnDisk, Awaiting, Pid}) ->
done
end.
-on_disk_await(Pid, MsgIds) ->
- Pid ! {await, MsgIds, self()},
+on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
+ Pid ! {await, gb_sets:from_list(MsgIds), self()},
receive {Pid, arrived} -> ok end.
on_disk_stop(Pid) ->