summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-16 13:25:48 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-16 13:25:48 +0100
commite5a6fa10bc89cec088b3a6d9dc442c7ee26754a8 (patch)
tree0824dd232493525a88c6ba02f18343a05377ba26
parent2801f9c5fa4653688bf09e10c02ca5da2b2fe14a (diff)
parent097d324a9db06bc40c2805e9ca3caca0278694c7 (diff)
downloadrabbitmq-server-e5a6fa10bc89cec088b3a6d9dc442c7ee26754a8.tar.gz
merge default into bug24386
-rw-r--r--src/file_handle_cache.erl30
-rw-r--r--src/filelib2.erl428
-rw-r--r--src/rabbit_misc.erl71
-rw-r--r--src/rabbit_queue_index.erl27
-rw-r--r--src/serialiser.erl75
5 files changed, 582 insertions, 49 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index e14dfe22..d4d7cdda 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -318,7 +318,7 @@ read(Ref, Count) ->
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case file:read(Hdl, Count) of
+ case prim_file:read(Hdl, Count) of
{ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
{Obj,
[Handle #handle { offset = Offset1 }]};
@@ -338,7 +338,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {file:write(Hdl, Data),
+ {prim_file:write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -365,7 +365,7 @@ sync(Ref) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case file:sync(Hdl) of
+ case prim_file:sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -382,7 +382,7 @@ truncate(Ref) ->
with_flushed_handles(
[Ref],
fun ([Handle1 = #handle { hdl = Hdl }]) ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end
@@ -409,7 +409,7 @@ copy(Src, Dest, Count) ->
fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
) ->
- case file:copy(SHdl, DHdl, Count) of
+ case prim_file:copy(SHdl, DHdl, Count) of
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
@@ -429,7 +429,7 @@ delete(Ref) ->
Handle = #handle { path = Path } ->
case hard_close(Handle #handle { is_dirty = false,
write_buffer = [] }) of
- ok -> file:delete(Path);
+ ok -> prim_file:delete(Path);
{Error, Handle1} -> put_handle(Ref, Handle1),
Error
end
@@ -444,7 +444,7 @@ clear(Ref) ->
case maybe_seek(bof, Handle #handle { write_buffer = [],
write_buffer_size = 0 }) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end;
@@ -567,10 +567,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ case prim_file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
@@ -694,10 +694,10 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> file:sync(Hdl);
+ true -> prim_file:sync(Hdl);
false -> ok
end,
- ok = file:close(Hdl),
+ ok = prim_file:close(Hdl),
age_tree_delete(Then),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
@@ -732,7 +732,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
at_eof = AtEoF }) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
case (case NeedsSeek of
- true -> file:position(Hdl, NewOffset);
+ true -> prim_file:position(Hdl, NewOffset);
false -> {ok, Offset}
end) of
{ok, Offset1} = Result ->
@@ -769,7 +769,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
diff --git a/src/filelib2.erl b/src/filelib2.erl
new file mode 100644
index 00000000..e92396ea
--- /dev/null
+++ b/src/filelib2.erl
@@ -0,0 +1,428 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+
+%% This is the 'filelib' module ported from R13B03. The only changes
+%% are replacing calls to 'file' with calls to 'prim_file', removing
+%% most of the specs and inlining the call to error.
+
+-module(filelib2).
+
+%% File utilities.
+
+-export([wildcard/1, wildcard/2, is_dir/1, is_file/1, is_regular/1,
+ compile_wildcard/1]).
+-export([fold_files/5, last_modified/1, file_size/1, ensure_dir/1]).
+
+-export([wildcard/3, is_dir/2, is_file/2, is_regular/2]).
+-export([fold_files/6, last_modified/2, file_size/2]).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(HANDLE_ERROR(Expr),
+ try
+ Expr
+ catch
+ error:{badpattern,_}=UnUsUalVaRiAbLeNaMe ->
+ %% Get the stack backtrace correct.
+ erlang:error(UnUsUalVaRiAbLeNaMe)
+ end).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+wildcard(Pattern) when is_list(Pattern) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, file)).
+
+wildcard(Pattern, Cwd) when is_list(Pattern), is_list(Cwd) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, file));
+wildcard(Pattern, Mod) when is_list(Pattern), is_atom(Mod) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, Mod)).
+
+wildcard(Pattern, Cwd, Mod)
+ when is_list(Pattern), is_list(Cwd), is_atom(Mod) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, Mod)).
+
+is_dir(Dir) ->
+ do_is_dir(Dir, file).
+
+is_dir(Dir, Mod) when is_atom(Mod) ->
+ do_is_dir(Dir, Mod).
+
+is_file(File) ->
+ do_is_file(File, file).
+
+is_file(File, Mod) when is_atom(Mod) ->
+ do_is_file(File, Mod).
+
+is_regular(File) ->
+ do_is_regular(File, file).
+
+is_regular(File, Mod) when is_atom(Mod) ->
+ do_is_regular(File, Mod).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc) ->
+ do_fold_files(Dir, RegExp, Recursive, Fun, Acc, file).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) when is_atom(Mod) ->
+ do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod).
+
+last_modified(File) ->
+ do_last_modified(File, file).
+
+last_modified(File, Mod) when is_atom(Mod) ->
+ do_last_modified(File, Mod).
+
+file_size(File) ->
+ do_file_size(File, file).
+
+file_size(File, Mod) when is_atom(Mod) ->
+ do_file_size(File, Mod).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+do_wildcard(Pattern, Mod) when is_list(Pattern) ->
+ do_wildcard_comp(do_compile_wildcard(Pattern), Mod).
+
+do_wildcard_comp({compiled_wildcard,{exists,File}}, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok,_} -> [File];
+ _ -> []
+ end;
+do_wildcard_comp({compiled_wildcard,[Base|Rest]}, Mod) ->
+ do_wildcard_1([Base], Rest, Mod).
+
+do_wildcard(Pattern, Cwd, Mod) when is_list(Pattern), is_list(Cwd) ->
+ do_wildcard_comp(do_compile_wildcard(Pattern), Cwd, Mod).
+
+do_wildcard_comp({compiled_wildcard,{exists,File}}, Cwd, Mod) ->
+ case eval_read_file_info(filename:absname(File, Cwd), Mod) of
+ {ok,_} -> [File];
+ _ -> []
+ end;
+do_wildcard_comp({compiled_wildcard,[current|Rest]}, Cwd0, Mod) ->
+ Cwd = filename:join([Cwd0]), %Slash away redundant slashes.
+ PrefixLen = length(Cwd)+1,
+ [lists:nthtail(PrefixLen, N) || N <- do_wildcard_1([Cwd], Rest, Mod)];
+do_wildcard_comp({compiled_wildcard,[Base|Rest]}, _Cwd, Mod) ->
+ do_wildcard_1([Base], Rest, Mod).
+
+do_is_dir(Dir, Mod) ->
+ case eval_read_file_info(Dir, Mod) of
+ {ok, #file_info{type=directory}} ->
+ true;
+ _ ->
+ false
+ end.
+
+do_is_file(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, #file_info{type=regular}} ->
+ true;
+ {ok, #file_info{type=directory}} ->
+ true;
+ _ ->
+ false
+ end.
+
+do_is_regular(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, #file_info{type=regular}} ->
+ true;
+ _ ->
+ false
+ end.
+
+%% fold_files(Dir, RegExp, Recursive, Fun, AccIn).
+
+%% folds the function Fun(F, Acc) -> Acc1 over
+%% all files <F> in <Dir> that match the regular expression <RegExp>
+%% If <Recursive> is true all sub-directories to <Dir> are processed
+
+do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) ->
+ {ok, Re1} = re:compile(RegExp),
+ do_fold_files1(Dir, Re1, Recursive, Fun, Acc, Mod).
+
+do_fold_files1(Dir, RegExp, Recursive, Fun, Acc, Mod) ->
+ case eval_list_dir(Dir, Mod) of
+ {ok, Files} -> do_fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc, Mod);
+ {error, _} -> Acc
+ end.
+
+do_fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc, _Mod) ->
+ Acc;
+do_fold_files2([File|T], Dir, RegExp, Recursive, Fun, Acc0, Mod) ->
+ FullName = filename:join(Dir, File),
+ case do_is_regular(FullName, Mod) of
+ true ->
+ case re:run(File, RegExp, [{capture,none}]) of
+ match ->
+ Acc = Fun(FullName, Acc0),
+ do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc, Mod);
+ nomatch ->
+ do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc0, Mod)
+ end;
+ false ->
+ case Recursive andalso do_is_dir(FullName, Mod) of
+ true ->
+ Acc1 = do_fold_files1(FullName, RegExp, Recursive,
+ Fun, Acc0, Mod),
+ do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc1, Mod);
+ false ->
+ do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc0, Mod)
+ end
+ end.
+
+do_last_modified(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, Info} ->
+ Info#file_info.mtime;
+ _ ->
+ 0
+ end.
+
+do_file_size(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, Info} ->
+ Info#file_info.size;
+ _ ->
+ 0
+ end.
+
+%%----------------------------------------------------------------------
+%% +type ensure_dir(X) -> ok | {error, Reason}.
+%% +type X = filename() | dirname()
+%% ensures that the directory name required to create D exists
+
+ensure_dir("/") ->
+ ok;
+ensure_dir(F) ->
+ Dir = filename:dirname(F),
+ case do_is_dir(Dir, file) of
+ true ->
+ ok;
+ false ->
+ ensure_dir(Dir),
+ prim_file:make_dir(Dir)
+ end.
+
+
+%%%
+%%% Pattern matching using a compiled wildcard.
+%%%
+
+do_wildcard_1(Files, Pattern, Mod) ->
+ do_wildcard_2(Files, Pattern, [], Mod).
+
+do_wildcard_2([File|Rest], Pattern, Result, Mod) ->
+ do_wildcard_2(Rest, Pattern, do_wildcard_3(File, Pattern, Result, Mod), Mod);
+do_wildcard_2([], _, Result, _Mod) ->
+ Result.
+
+do_wildcard_3(Base, [Pattern|Rest], Result, Mod) ->
+ case do_list_dir(Base, Mod) of
+ {ok, Files0} ->
+ Files = lists:sort(Files0),
+ Matches = wildcard_4(Pattern, Files, Base, []),
+ do_wildcard_2(Matches, Rest, Result, Mod);
+ _ ->
+ Result
+ end;
+do_wildcard_3(Base, [], Result, _Mod) ->
+ [Base|Result].
+
+wildcard_4(Pattern, [File|Rest], Base, Result) ->
+ case wildcard_5(Pattern, File) of
+ true ->
+ wildcard_4(Pattern, Rest, Base, [join(Base, File)|Result]);
+ false ->
+ wildcard_4(Pattern, Rest, Base, Result)
+ end;
+wildcard_4(_Patt, [], _Base, Result) ->
+ Result.
+
+wildcard_5([question|Rest1], [_|Rest2]) ->
+ wildcard_5(Rest1, Rest2);
+wildcard_5([accept], _) ->
+ true;
+wildcard_5([star|Rest], File) ->
+ do_star(Rest, File);
+wildcard_5([{one_of, Ordset}|Rest], [C|File]) ->
+ case ordsets:is_element(C, Ordset) of
+ true -> wildcard_5(Rest, File);
+ false -> false
+ end;
+wildcard_5([{alt, Alts}], File) ->
+ do_alt(Alts, File);
+wildcard_5([C|Rest1], [C|Rest2]) when is_integer(C) ->
+ wildcard_5(Rest1, Rest2);
+wildcard_5([X|_], [Y|_]) when is_integer(X), is_integer(Y) ->
+ false;
+wildcard_5([], []) ->
+ true;
+wildcard_5([], [_|_]) ->
+ false;
+wildcard_5([_|_], []) ->
+ false.
+
+do_star(Pattern, [X|Rest]) ->
+ case wildcard_5(Pattern, [X|Rest]) of
+ true -> true;
+ false -> do_star(Pattern, Rest)
+ end;
+do_star(Pattern, []) ->
+ wildcard_5(Pattern, []).
+
+do_alt([Alt|Rest], File) ->
+ case wildcard_5(Alt, File) of
+ true -> true;
+ false -> do_alt(Rest, File)
+ end;
+do_alt([], _File) ->
+ false.
+
+do_list_dir(current, Mod) -> eval_list_dir(".", Mod);
+do_list_dir(Dir, Mod) -> eval_list_dir(Dir, Mod).
+
+join(current, File) -> File;
+join(Base, File) -> filename:join(Base, File).
+
+
+%%% Compiling a wildcard.
+
+compile_wildcard(Pattern) ->
+ ?HANDLE_ERROR(do_compile_wildcard(Pattern)).
+
+do_compile_wildcard(Pattern) ->
+ {compiled_wildcard,compile_wildcard_1(Pattern)}.
+
+compile_wildcard_1(Pattern) ->
+ [Root|Rest] = filename:split(Pattern),
+ case filename:pathtype(Root) of
+ relative ->
+ compile_wildcard_2([Root|Rest], current);
+ _ ->
+ compile_wildcard_2(Rest, [Root])
+ end.
+
+compile_wildcard_2([Part|Rest], Root) ->
+ case compile_part(Part) of
+ Part ->
+ compile_wildcard_2(Rest, join(Root, Part));
+ Pattern ->
+ compile_wildcard_3(Rest, [Pattern,Root])
+ end;
+compile_wildcard_2([], Root) -> {exists,Root}.
+
+compile_wildcard_3([Part|Rest], Result) ->
+ compile_wildcard_3(Rest, [compile_part(Part)|Result]);
+compile_wildcard_3([], Result) ->
+ lists:reverse(Result).
+
+compile_part(Part) ->
+ compile_part(Part, false, []).
+
+compile_part_to_sep(Part) ->
+ compile_part(Part, true, []).
+
+compile_part([], true, _) ->
+ erlang:error({badpattern, missing_delimiter});
+compile_part([$,|Rest], true, Result) ->
+ {ok, $,, lists:reverse(Result), Rest};
+compile_part([$}|Rest], true, Result) ->
+ {ok, $}, lists:reverse(Result), Rest};
+compile_part([$?|Rest], Upto, Result) ->
+ compile_part(Rest, Upto, [question|Result]);
+compile_part([$*], Upto, Result) ->
+ compile_part([], Upto, [accept|Result]);
+compile_part([$*|Rest], Upto, Result) ->
+ compile_part(Rest, Upto, [star|Result]);
+compile_part([$[|Rest], Upto, Result) ->
+ case compile_charset(Rest, ordsets:new()) of
+ {ok, Charset, Rest1} ->
+ compile_part(Rest1, Upto, [Charset|Result]);
+ error ->
+ compile_part(Rest, Upto, [$[|Result])
+ end;
+compile_part([${|Rest], Upto, Result) ->
+ case compile_alt(Rest) of
+ {ok, Alt} ->
+ lists:reverse(Result, [Alt]);
+ error ->
+ compile_part(Rest, Upto, [${|Result])
+ end;
+compile_part([X|Rest], Upto, Result) ->
+ compile_part(Rest, Upto, [X|Result]);
+compile_part([], _Upto, Result) ->
+ lists:reverse(Result).
+
+compile_charset([$]|Rest], Ordset) ->
+ compile_charset1(Rest, ordsets:add_element($], Ordset));
+compile_charset([$-|Rest], Ordset) ->
+ compile_charset1(Rest, ordsets:add_element($-, Ordset));
+compile_charset([], _Ordset) ->
+ error;
+compile_charset(List, Ordset) ->
+ compile_charset1(List, Ordset).
+
+compile_charset1([Lower, $-, Upper|Rest], Ordset) when Lower =< Upper ->
+ compile_charset1(Rest, compile_range(Lower, Upper, Ordset));
+compile_charset1([$]|Rest], Ordset) ->
+ {ok, {one_of, Ordset}, Rest};
+compile_charset1([X|Rest], Ordset) ->
+ compile_charset1(Rest, ordsets:add_element(X, Ordset));
+compile_charset1([], _Ordset) ->
+ error.
+
+compile_range(Lower, Current, Ordset) when Lower =< Current ->
+ compile_range(Lower, Current-1, ordsets:add_element(Current, Ordset));
+compile_range(_, _, Ordset) ->
+ Ordset.
+
+compile_alt(Pattern) ->
+ compile_alt(Pattern, []).
+
+compile_alt(Pattern, Result) ->
+ case compile_part_to_sep(Pattern) of
+ {ok, $,, AltPattern, Rest} ->
+ compile_alt(Rest, [AltPattern|Result]);
+ {ok, $}, AltPattern, Rest} ->
+ NewResult = [AltPattern|Result],
+ RestPattern = compile_part(Rest),
+ {ok, {alt, [Alt++RestPattern || Alt <- NewResult]}};
+ Pattern ->
+ error
+ end.
+
+eval_read_file_info(File, file) ->
+ prim_file:read_file_info(File);
+eval_read_file_info(File, erl_prim_loader) ->
+ case erl_prim_loader:read_file_info(File) of
+ error -> {error, erl_prim_loader};
+ Res-> Res
+ end;
+eval_read_file_info(File, Mod) ->
+ Mod:read_file_info(File).
+
+eval_list_dir(Dir, file) ->
+ prim_file:list_dir(Dir);
+eval_list_dir(Dir, erl_prim_loader) ->
+ case erl_prim_loader:list_dir(Dir) of
+ error -> {error, erl_prim_loader};
+ Res-> Res
+ end;
+eval_list_dir(Dir, Mod) ->
+ Mod:list_dir(Dir).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0b39a209..77a9b5f2 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -526,7 +526,36 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
-read_term_file(File) -> file:consult(File).
+read_term_file(File) ->
+ try
+ {ok, Data} = prim_file:read_file(File),
+ {ok, Tokens, _} = erl_scan:string(binary:bin_to_list(Data)),
+ TokenGroups = group_tokens(Tokens),
+ {ok, [begin
+ {ok, Term} = erl_parse:parse_term(Tokens1),
+ Term
+ end || Tokens1 <- TokenGroups]}
+ catch
+ error:{badmatch, Error} -> Error
+ end.
+
+group_tokens([]) ->
+ [];
+group_tokens([{_, N, _} | _] = Tokens) ->
+ lists:reverse(group_tokens([], N, Tokens));
+group_tokens([{_, N} | _] = Tokens) ->
+ lists:reverse(group_tokens([], N, Tokens)).
+
+group_tokens(Cur, _, []) ->
+ [lists:reverse(Cur)];
+group_tokens(Cur, N, [Tok = {_, N} | Toks]) ->
+ group_tokens([Tok | Cur], N, Toks);
+group_tokens(Cur, _, [{_, M} | _] = Toks) ->
+ [lists:reverse(Cur) | group_tokens([], M, Toks)];
+group_tokens(Cur, N, [Tok = {_, N, _} | Toks]) ->
+ group_tokens([Tok | Cur], N, Toks);
+group_tokens(Cur, _, [{_, M, _} | _] = Toks) ->
+ [lists:reverse(Cur) | group_tokens([], M, Toks)].
write_term_file(File, Terms) ->
write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
@@ -544,12 +573,12 @@ write_file(Path, Data, Modes) ->
Modes1 = [binary, write | (Modes -- [binary, write])],
case make_binary(Data) of
Bin when is_binary(Bin) ->
- case file:open(Path, Modes1) of
- {ok, Hdl} -> try file:write(Hdl, Bin) of
- ok -> file:sync(Hdl);
+ case prim_file:open(Path, Modes1) of
+ {ok, Hdl} -> try prim_file:write(Hdl, Bin) of
+ ok -> prim_file:sync(Hdl);
{error, _} = E -> E
after
- file:close(Hdl)
+ prim_file:close(Hdl)
end;
{error, _} = E -> E
end;
@@ -567,7 +596,7 @@ make_binary(List) ->
append_file(File, Suffix) ->
- case file:read_file_info(File) of
+ case prim_file:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
{error, enoent} -> append_file(File, 0, Suffix);
Error -> Error
@@ -576,18 +605,18 @@ append_file(File, Suffix) ->
append_file(_, _, "") ->
ok;
append_file(File, 0, Suffix) ->
- case file:open([File, Suffix], [append]) of
- {ok, Fd} -> file:close(Fd);
+ case prim_file:open([File, Suffix], [append]) of
+ {ok, Fd} -> prim_file:close(Fd);
Error -> Error
end;
append_file(File, _, Suffix) ->
- case file:read_file(File) of
+ case prim_file:read_file(File) of
{ok, Data} -> write_file([File, Suffix], Data, [append]);
Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
- case filelib:ensure_dir(Filename) of
+ case filelib2:ensure_dir(Filename) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
@@ -749,13 +778,13 @@ recursive_delete(Files) ->
end, ok, Files).
recursive_delete1(Path) ->
- case filelib:is_dir(Path) and not(is_symlink(Path)) of
- false -> case file:delete(Path) of
+ case filelib2:is_dir(Path) and not(is_symlink(Path)) of
+ false -> case prim_file:delete(Path) of
ok -> ok;
{error, enoent} -> ok; %% Path doesn't exist anyway
{error, Err} -> {error, {Path, Err}}
end;
- true -> case file:list_dir(Path) of
+ true -> case prim_file:list_dir(Path) of
{ok, FileNames} ->
case lists:foldl(
fun (FileName, ok) ->
@@ -765,7 +794,7 @@ recursive_delete1(Path) ->
Error
end, ok, FileNames) of
ok ->
- case file:del_dir(Path) of
+ case prim_file:del_dir(Path) of
ok -> ok;
{error, Err} -> {error, {Path, Err}}
end;
@@ -784,15 +813,15 @@ is_symlink(Name) ->
end.
recursive_copy(Src, Dest) ->
- case filelib:is_dir(Src) of
- false -> case file:copy(Src, Dest) of
+ case filelib2:is_dir(Src) of
+ false -> case prim_file:copy(Src, Dest, infinity) of
{ok, _Bytes} -> ok;
{error, enoent} -> ok; %% Path doesn't exist anyway
{error, Err} -> {error, {Src, Dest, Err}}
end;
- true -> case file:list_dir(Src) of
+ true -> case prim_file:list_dir(Src) of
{ok, FileNames} ->
- case file:make_dir(Dest) of
+ case prim_file:make_dir(Dest) of
ok ->
lists:foldl(
fun (FileName, ok) ->
@@ -902,10 +931,10 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
%% TODO: When we stop supporting Erlang prior to R14, this should be
%% replaced with file:open [write, exclusive]
lock_file(Path) ->
- case filelib:is_file(Path) of
+ case filelib2:is_file(Path) of
true -> {error, eexist};
- false -> {ok, Lock} = file:open(Path, [write]),
- ok = file:close(Lock)
+ false -> {ok, Lock} = prim_file:open(Path, [write]),
+ ok = prim_file:close(Lock)
end.
const_ok() -> ok.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 636913b5..4461da42 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -229,7 +229,7 @@
init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
- false = filelib:is_file(Dir), %% is_file == is file or dir
+ false = filelib2:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
@@ -256,6 +256,7 @@ terminate(Terms, State) ->
delete_and_terminate(State) ->
{_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
+ ok = file_handle_cache:obtain(),
ok = rabbit_misc:recursive_delete([Dir]),
State1.
@@ -366,9 +367,9 @@ recover(DurableQueues) ->
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
- case file:list_dir(Dir) of
+ case prim_file:list_dir(Dir) of
{ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
+ filelib2:is_dir(
filename:join(Dir, Entry)) ];
{error, enoent} -> []
end.
@@ -392,7 +393,7 @@ blank_state(QueueName) ->
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case file:delete(clean_file_name(Dir)) of
+ case prim_file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
@@ -402,7 +403,7 @@ read_shutdown_terms(Dir) ->
store_clean_shutdown(Terms, Dir) ->
CleanFileName = clean_file_name(Dir),
- ok = filelib:ensure_dir(CleanFileName),
+ ok = filelib2:ensure_dir(CleanFileName),
rabbit_misc:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
@@ -603,8 +604,8 @@ flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
- case filelib:is_file(Path) of
- true -> ok = file:delete(Path);
+ case filelib2:is_file(Path) of
+ true -> ok = prim_file:delete(Path);
false -> ok
end,
SegmentsN;
@@ -630,7 +631,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- ok = filelib:ensure_dir(Path),
+ ok = filelib2:ensure_dir(Path),
{ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -735,7 +736,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end,
SegName)), Set)
end, sets:from_list(segment_nums(Segments)),
- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
+ filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
@@ -836,7 +837,7 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
- case filelib:is_file(Path) of
+ case filelib2:is_file(Path) of
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
@@ -1040,12 +1041,12 @@ foreach_queue_index(Funs) ->
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
[ok = transform_file(filename:join(Dir, Seg), SegmentFun)
- || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ || Seg <- filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
transform_file(Path, Fun) ->
PathTmp = Path ++ ".upgrade",
- case filelib:file_size(Path) of
+ case filelib2:file_size(Path) of
0 -> ok;
Size -> {ok, PathTmpHdl} =
file_handle_cache:open(PathTmp, ?WRITE_MODE,
@@ -1059,7 +1060,7 @@ transform_file(Path, Fun) ->
ok = drive_transform_fun(Fun, PathTmpHdl, Content),
ok = file_handle_cache:close(PathTmpHdl),
- ok = file:rename(PathTmp, Path)
+ ok = prim_file:rename(PathTmp, Path)
end.
drive_transform_fun(Fun, Hdl, Contents) ->
diff --git a/src/serialiser.erl b/src/serialiser.erl
new file mode 100644
index 00000000..0f9bcf17
--- /dev/null
+++ b/src/serialiser.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%%
+
+-module(serialiser).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, submit/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(submit/2 ::
+ (pid() | atom(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
+
+submit(Pid, Fun) ->
+ gen_server2:call(Pid, {run, Fun}, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, nostate, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({run, Fun}, _From, State) ->
+ {reply, run(Fun), State, hibernate};
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+run({M, F, A}) -> apply(M, F, A);
+run(Fun) -> Fun().