summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-08-17 12:36:43 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-08-17 12:36:43 +0100
commit1a7940bbc28618ed34ed97eda70d6573631dd98d (patch)
treea0ff35d46f79a2ebaf63f013a533298d5f8c2b8d
parent9ad783044191029d51600a8e1d81d04437329c7a (diff)
parentefde7ce70136bf56042293f23549756bfd8a59bc (diff)
downloadrabbitmq-server-1a7940bbc28618ed34ed97eda70d6573631dd98d.tar.gz
merge bug24340 into default
-rw-r--r--src/file_handle_cache.erl37
-rw-r--r--src/rabbit_msg_store.erl54
-rw-r--r--src/rabbit_tests.erl79
3 files changed, 65 insertions, 105 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 235e14c0..3c2111dc 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -44,7 +44,6 @@
%% 4) You can find out what your 'real' offset is, and what your
%% 'virtual' offset is (i.e. where the hdl really is, and where it
%% would be after the write buffer is written out).
-%% 5) You can find out what the offset was when you last sync'd.
%%
%% There is also a server component which serves to limit the number
%% of open file descriptors. This is a hard limit: the server
@@ -144,8 +143,8 @@
-export([register_callback/3]).
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
- last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
- flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
+ current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3,
+ set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
info/1]).
-export([ulimit/0]).
@@ -172,7 +171,6 @@
-record(handle,
{ hdl,
offset,
- trusted_offset,
is_dirty,
write_buffer_size,
write_buffer_size_limit,
@@ -240,7 +238,6 @@
-spec(sync/1 :: (ref()) -> ok_or_error()).
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
-spec(truncate/1 :: (ref()) -> ok_or_error()).
--spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(flush/1 :: (ref()) -> ok_or_error()).
@@ -365,11 +362,10 @@ sync(Ref) ->
[Ref],
fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
ok;
- ([Handle = #handle { hdl = Hdl, offset = Offset,
+ ([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
case file:sync(Hdl) of
- ok -> {ok, [Handle #handle { trusted_offset = Offset,
- is_dirty = false }]};
+ ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
end).
@@ -384,21 +380,13 @@ position(Ref, NewOffset) ->
truncate(Ref) ->
with_flushed_handles(
[Ref],
- fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
- trusted_offset = TOffset }]) ->
+ fun ([Handle1 = #handle { hdl = Hdl }]) ->
case file:truncate(Hdl) of
- ok -> TOffset1 = lists:min([Offset, TOffset]),
- {ok, [Handle1 #handle { trusted_offset = TOffset1,
- at_eof = true }]};
+ ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end
end).
-last_sync_offset(Ref) ->
- with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) ->
- {ok, TOffset}
- end).
-
current_virtual_offset(Ref) ->
with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
offset = Offset,
@@ -456,8 +444,7 @@ clear(Ref) ->
write_buffer_size = 0 }) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
case file:truncate(Hdl) of
- ok -> {ok, [Handle1 #handle {trusted_offset = 0,
- at_eof = true }]};
+ ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end;
{{error, _} = Error, Handle1} ->
@@ -585,14 +572,13 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
end) of
{ok, Hdl} ->
Now = now(),
- {{ok, Offset1}, Handle1} =
+ {{ok, _Offset}, Handle1} =
maybe_seek(Offset, Handle #handle { hdl = Hdl,
offset = 0,
last_used_at = Now }),
- Handle2 = Handle1 #handle { trusted_offset = Offset1 },
- put({Ref, fhc_handle}, Handle2),
+ put({Ref, fhc_handle}, Handle1),
reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
- [{Ref, Handle2} | RefHdls]);
+ [{Ref, Handle1} | RefHdls]);
Error ->
%% NB: none of the handles in ToOpen are in the age tree
Oldest = oldest(Tree, fun () -> undefined end),
@@ -677,7 +663,6 @@ new_closed_handle(Path, Mode, Options) ->
Ref = make_ref(),
put({Ref, fhc_handle}, #handle { hdl = closed,
offset = 0,
- trusted_offset = 0,
is_dirty = false,
write_buffer_size = 0,
write_buffer_size_limit = WriteBufferSize,
@@ -705,7 +690,6 @@ soft_close(Handle = #handle { hdl = closed }) ->
soft_close(Handle) ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl,
- offset = Offset,
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
@@ -715,7 +699,6 @@ soft_close(Handle) ->
ok = file:close(Hdl),
age_tree_delete(Then),
{ok, Handle1 #handle { hdl = closed,
- trusted_offset = Offset,
is_dirty = false,
last_used_at = undefined }};
{_Error, _Handle} = Result ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2be2e883..17d5f64b 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, sync/3]).
+ write/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -60,7 +60,6 @@
current_file, %% current file name as number
current_file_handle, %% current file handle since the last fsync?
file_handle_cache, %% file handle cache
- on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
@@ -152,8 +151,6 @@
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
--spec(sync/3 ::
- ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
@@ -442,7 +439,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -639,7 +635,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
current_file = 0,
current_file_handle = undefined,
file_handle_cache = dict:new(),
- on_sync = [],
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
@@ -761,21 +756,6 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({sync, MsgIds, K},
- State = #msstate { current_file = CurFile,
- current_file_handle = CurHdl,
- on_sync = Syncs }) ->
- {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(fun (MsgId) ->
- #msg_location { file = File, offset = Offset } =
- index_lookup(MsgId, State),
- File =:= CurFile andalso Offset >= SyncOffset
- end, MsgIds) of
- false -> K(),
- noreply(State);
- true -> noreply(State #msstate { on_sync = [K | Syncs] })
- end;
-
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
@@ -854,17 +834,15 @@ reply(Reply, State) ->
{reply, Reply, State1, Timeout}.
next_state(State = #msstate { sync_timer_ref = undefined,
- on_sync = Syncs,
cref_to_msg_ids = CTM }) ->
- case {Syncs, dict:size(CTM)} of
- {[], 0} -> {State, hibernate};
- _ -> {start_sync_timer(State), 0}
+ case dict:size(CTM) of
+ 0 -> {State, hibernate};
+ _ -> {start_sync_timer(State), 0}
end;
-next_state(State = #msstate { on_sync = Syncs,
- cref_to_msg_ids = CTM }) ->
- case {Syncs, dict:size(CTM)} of
- {[], 0} -> {stop_sync_timer(State), hibernate};
- _ -> {State, 0}
+next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
+ case dict:size(CTM) of
+ 0 -> {stop_sync_timer(State), hibernate};
+ _ -> {State, 0}
end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
@@ -878,7 +856,6 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
State #msstate { sync_timer_ref = undefined }.
internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs,
cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
CGs = dict:fold(fun (CRef, MsgIds, NS) ->
@@ -887,16 +864,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, MsgIds} | NS]
end
end, [], CTM),
- ok = case {Syncs, CGs} of
- {[], []} -> ok;
- _ -> file_handle_cache:sync(CurHdl)
+ ok = case CGs of
+ [] -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
end,
- [K() || K <- lists:reverse(Syncs)],
- State2 = lists:foldl(
- fun ({CRef, MsgIds}, StateN) ->
- client_confirm(CRef, MsgIds, written, StateN)
- end, State1, CGs),
- State2 #msstate { on_sync = [] }.
+ lists:foldl(fun ({CRef, MsgIds}, StateN) ->
+ client_confirm(CRef, MsgIds, written, StateN)
+ end, State1, CGs).
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f36b21ca..a068efe5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1824,25 +1824,49 @@ msg_id_bin(X) ->
msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
+on_disk_capture() ->
+ on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}).
+on_disk_capture({OnDisk, Awaiting, Pid}) ->
+ Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of
+ true -> Pid ! {self(), arrived}, undefined;
+ false -> Pid
+ end,
+ receive
+ {await, MsgIds, Pid2} ->
+ true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting),
+ on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2});
+ {on_disk, MsgIds} ->
+ on_disk_capture({gb_sets:union(OnDisk, MsgIds),
+ gb_sets:subtract(Awaiting, MsgIds),
+ Pid1});
+ stop ->
+ done
+ end.
+
+on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
+ Pid ! {await, gb_sets:from_list(MsgIds), self()},
+ receive {Pid, arrived} -> ok end.
+
+on_disk_stop(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ Pid ! stop,
+ receive {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end.
+
+msg_store_client_init_capture(MsgStore, Ref) ->
+ Pid = spawn(fun on_disk_capture/0),
+ {Pid, rabbit_msg_store:client_init(
+ MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
+ Pid ! {on_disk, MsgIds}
+ end, undefined)}.
+
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
fun (MsgId, Atom1) when Atom1 =:= Atom ->
rabbit_msg_store:contains(MsgId, MSCState) end,
Atom, MsgIds).
-msg_store_sync(MsgIds, MSCState) ->
- Ref = make_ref(),
- Self = self(),
- ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end,
- MSCState),
- receive
- {sync, Ref} -> ok
- after
- 10000 ->
- io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]),
- throw(timeout)
- end.
-
msg_store_read(MsgIds, MSCState) ->
lists:foldl(fun (MsgId, MSCStateM) ->
{{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(
@@ -1876,22 +1900,18 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
test_msg_store() ->
restart_msg_store_empty(),
- Self = self(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
Ref = rabbit_guid:guid(),
- MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
+ {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
- ok = msg_store_sync(MsgIds1stHalf, MSCState),
+ ok = on_disk_await(Cap, MsgIds1stHalf),
%% publish the second half
ok = msg_store_write(MsgIds2ndHalf, MSCState),
- %% sync on the first half again - the msg_store will be dirty, but
- %% we won't need the fsync
- ok = msg_store_sync(MsgIds1stHalf, MSCState),
%% check they're all in there
true = msg_store_contains(true, MsgIds, MSCState),
%% publish the latter half twice so we hit the caching and ref count code
@@ -1900,25 +1920,8 @@ test_msg_store() ->
true = msg_store_contains(true, MsgIds, MSCState),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
- ok = lists:foldl(
- fun (MsgId, ok) -> rabbit_msg_store:sync(
- [MsgId], fun () -> Self ! {sync, MsgId} end,
- MSCState)
- end, ok, MsgIds2ndHalf),
- lists:foldl(
- fun(MsgId, ok) ->
- receive
- {sync, MsgId} -> ok
- after
- 10000 ->
- io:format("Sync from msg_store missing (msg_id: ~p)~n",
- [MsgId]),
- throw(timeout)
- end
- end, ok, MsgIds2ndHalf),
- %% it's very likely we're not dirty here, so the 1st half sync
- %% should hit a different code path
- ok = msg_store_sync(MsgIds1stHalf, MSCState),
+ ok = on_disk_await(Cap, MsgIds2ndHalf),
+ ok = on_disk_stop(Cap),
%% read them all
MSCState1 = msg_store_read(MsgIds, MSCState),
%% read them all again - this will hit the cache, not disk