diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-16 13:25:48 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-16 13:25:48 +0100 |
commit | e5a6fa10bc89cec088b3a6d9dc442c7ee26754a8 (patch) | |
tree | 0824dd232493525a88c6ba02f18343a05377ba26 | |
parent | 2801f9c5fa4653688bf09e10c02ca5da2b2fe14a (diff) | |
parent | 097d324a9db06bc40c2805e9ca3caca0278694c7 (diff) | |
download | rabbitmq-server-e5a6fa10bc89cec088b3a6d9dc442c7ee26754a8.tar.gz |
merge default into bug24386
-rw-r--r-- | src/file_handle_cache.erl | 30 | ||||
-rw-r--r-- | src/filelib2.erl | 428 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 71 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 27 | ||||
-rw-r--r-- | src/serialiser.erl | 75 |
5 files changed, 582 insertions, 49 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e14dfe22..d4d7cdda 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -318,7 +318,7 @@ read(Ref, Count) -> fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case file:read(Hdl, Count) of + case prim_file:read(Hdl, Count) of {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), {Obj, [Handle #handle { offset = Offset1 }]}; @@ -338,7 +338,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {file:write(Hdl, Data), + {prim_file:write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -365,7 +365,7 @@ sync(Ref) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case file:sync(Hdl) of + case prim_file:sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -382,7 +382,7 @@ truncate(Ref) -> with_flushed_handles( [Ref], fun ([Handle1 = #handle { hdl = Hdl }]) -> - case file:truncate(Hdl) of + case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end @@ -409,7 +409,7 @@ copy(Src, Dest, Count) -> fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] ) -> - case file:copy(SHdl, DHdl, Count) of + case prim_file:copy(SHdl, DHdl, Count) of {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, @@ -429,7 +429,7 @@ delete(Ref) -> Handle = #handle { path = Path } -> case hard_close(Handle #handle { is_dirty = false, write_buffer = [] }) of - ok -> file:delete(Path); + ok -> prim_file:delete(Path); {Error, Handle1} -> put_handle(Ref, Handle1), Error end @@ -444,7 +444,7 @@ clear(Ref) -> case maybe_seek(bof, Handle #handle { write_buffer = [], write_buffer_size = 0 }) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> - case file:truncate(Hdl) of + case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end; @@ -567,10 +567,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case file:open(Path, case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end) of + case prim_file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = @@ -694,10 +694,10 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> file:sync(Hdl); + true -> prim_file:sync(Hdl); false -> ok end, - ok = file:close(Hdl), + ok = prim_file:close(Hdl), age_tree_delete(Then), {ok, Handle1 #handle { hdl = closed, is_dirty = false, @@ -732,7 +732,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, at_eof = AtEoF }) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), case (case NeedsSeek of - true -> file:position(Hdl, NewOffset); + true -> prim_file:position(Hdl, NewOffset); false -> {ok, Offset} end) of {ok, Offset1} = Result -> @@ -769,7 +769,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case file:write(Hdl, lists:reverse(WriteBuffer)) of + case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, diff --git a/src/filelib2.erl b/src/filelib2.erl new file mode 100644 index 00000000..e92396ea --- /dev/null +++ b/src/filelib2.erl @@ -0,0 +1,428 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% + +%% This is the 'filelib' module ported from R13B03. The only changes +%% are replacing calls to 'file' with calls to 'prim_file', removing +%% most of the specs and inlining the call to error. + +-module(filelib2). + +%% File utilities. + +-export([wildcard/1, wildcard/2, is_dir/1, is_file/1, is_regular/1, + compile_wildcard/1]). +-export([fold_files/5, last_modified/1, file_size/1, ensure_dir/1]). + +-export([wildcard/3, is_dir/2, is_file/2, is_regular/2]). +-export([fold_files/6, last_modified/2, file_size/2]). + +-include_lib("kernel/include/file.hrl"). + +-define(HANDLE_ERROR(Expr), + try + Expr + catch + error:{badpattern,_}=UnUsUalVaRiAbLeNaMe -> + %% Get the stack backtrace correct. + erlang:error(UnUsUalVaRiAbLeNaMe) + end). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +wildcard(Pattern) when is_list(Pattern) -> + ?HANDLE_ERROR(do_wildcard(Pattern, file)). + +wildcard(Pattern, Cwd) when is_list(Pattern), is_list(Cwd) -> + ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, file)); +wildcard(Pattern, Mod) when is_list(Pattern), is_atom(Mod) -> + ?HANDLE_ERROR(do_wildcard(Pattern, Mod)). + +wildcard(Pattern, Cwd, Mod) + when is_list(Pattern), is_list(Cwd), is_atom(Mod) -> + ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, Mod)). + +is_dir(Dir) -> + do_is_dir(Dir, file). + +is_dir(Dir, Mod) when is_atom(Mod) -> + do_is_dir(Dir, Mod). + +is_file(File) -> + do_is_file(File, file). + +is_file(File, Mod) when is_atom(Mod) -> + do_is_file(File, Mod). + +is_regular(File) -> + do_is_regular(File, file). + +is_regular(File, Mod) when is_atom(Mod) -> + do_is_regular(File, Mod). + +fold_files(Dir, RegExp, Recursive, Fun, Acc) -> + do_fold_files(Dir, RegExp, Recursive, Fun, Acc, file). + +fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) when is_atom(Mod) -> + do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod). + +last_modified(File) -> + do_last_modified(File, file). + +last_modified(File, Mod) when is_atom(Mod) -> + do_last_modified(File, Mod). + +file_size(File) -> + do_file_size(File, file). + +file_size(File, Mod) when is_atom(Mod) -> + do_file_size(File, Mod). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +do_wildcard(Pattern, Mod) when is_list(Pattern) -> + do_wildcard_comp(do_compile_wildcard(Pattern), Mod). + +do_wildcard_comp({compiled_wildcard,{exists,File}}, Mod) -> + case eval_read_file_info(File, Mod) of + {ok,_} -> [File]; + _ -> [] + end; +do_wildcard_comp({compiled_wildcard,[Base|Rest]}, Mod) -> + do_wildcard_1([Base], Rest, Mod). + +do_wildcard(Pattern, Cwd, Mod) when is_list(Pattern), is_list(Cwd) -> + do_wildcard_comp(do_compile_wildcard(Pattern), Cwd, Mod). + +do_wildcard_comp({compiled_wildcard,{exists,File}}, Cwd, Mod) -> + case eval_read_file_info(filename:absname(File, Cwd), Mod) of + {ok,_} -> [File]; + _ -> [] + end; +do_wildcard_comp({compiled_wildcard,[current|Rest]}, Cwd0, Mod) -> + Cwd = filename:join([Cwd0]), %Slash away redundant slashes. + PrefixLen = length(Cwd)+1, + [lists:nthtail(PrefixLen, N) || N <- do_wildcard_1([Cwd], Rest, Mod)]; +do_wildcard_comp({compiled_wildcard,[Base|Rest]}, _Cwd, Mod) -> + do_wildcard_1([Base], Rest, Mod). + +do_is_dir(Dir, Mod) -> + case eval_read_file_info(Dir, Mod) of + {ok, #file_info{type=directory}} -> + true; + _ -> + false + end. + +do_is_file(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, #file_info{type=regular}} -> + true; + {ok, #file_info{type=directory}} -> + true; + _ -> + false + end. + +do_is_regular(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, #file_info{type=regular}} -> + true; + _ -> + false + end. + +%% fold_files(Dir, RegExp, Recursive, Fun, AccIn). + +%% folds the function Fun(F, Acc) -> Acc1 over +%% all files <F> in <Dir> that match the regular expression <RegExp> +%% If <Recursive> is true all sub-directories to <Dir> are processed + +do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) -> + {ok, Re1} = re:compile(RegExp), + do_fold_files1(Dir, Re1, Recursive, Fun, Acc, Mod). + +do_fold_files1(Dir, RegExp, Recursive, Fun, Acc, Mod) -> + case eval_list_dir(Dir, Mod) of + {ok, Files} -> do_fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc, Mod); + {error, _} -> Acc + end. + +do_fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc, _Mod) -> + Acc; +do_fold_files2([File|T], Dir, RegExp, Recursive, Fun, Acc0, Mod) -> + FullName = filename:join(Dir, File), + case do_is_regular(FullName, Mod) of + true -> + case re:run(File, RegExp, [{capture,none}]) of + match -> + Acc = Fun(FullName, Acc0), + do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc, Mod); + nomatch -> + do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc0, Mod) + end; + false -> + case Recursive andalso do_is_dir(FullName, Mod) of + true -> + Acc1 = do_fold_files1(FullName, RegExp, Recursive, + Fun, Acc0, Mod), + do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc1, Mod); + false -> + do_fold_files2(T, Dir, RegExp, Recursive, Fun, Acc0, Mod) + end + end. + +do_last_modified(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, Info} -> + Info#file_info.mtime; + _ -> + 0 + end. + +do_file_size(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, Info} -> + Info#file_info.size; + _ -> + 0 + end. + +%%---------------------------------------------------------------------- +%% +type ensure_dir(X) -> ok | {error, Reason}. +%% +type X = filename() | dirname() +%% ensures that the directory name required to create D exists + +ensure_dir("/") -> + ok; +ensure_dir(F) -> + Dir = filename:dirname(F), + case do_is_dir(Dir, file) of + true -> + ok; + false -> + ensure_dir(Dir), + prim_file:make_dir(Dir) + end. + + +%%% +%%% Pattern matching using a compiled wildcard. +%%% + +do_wildcard_1(Files, Pattern, Mod) -> + do_wildcard_2(Files, Pattern, [], Mod). + +do_wildcard_2([File|Rest], Pattern, Result, Mod) -> + do_wildcard_2(Rest, Pattern, do_wildcard_3(File, Pattern, Result, Mod), Mod); +do_wildcard_2([], _, Result, _Mod) -> + Result. + +do_wildcard_3(Base, [Pattern|Rest], Result, Mod) -> + case do_list_dir(Base, Mod) of + {ok, Files0} -> + Files = lists:sort(Files0), + Matches = wildcard_4(Pattern, Files, Base, []), + do_wildcard_2(Matches, Rest, Result, Mod); + _ -> + Result + end; +do_wildcard_3(Base, [], Result, _Mod) -> + [Base|Result]. + +wildcard_4(Pattern, [File|Rest], Base, Result) -> + case wildcard_5(Pattern, File) of + true -> + wildcard_4(Pattern, Rest, Base, [join(Base, File)|Result]); + false -> + wildcard_4(Pattern, Rest, Base, Result) + end; +wildcard_4(_Patt, [], _Base, Result) -> + Result. + +wildcard_5([question|Rest1], [_|Rest2]) -> + wildcard_5(Rest1, Rest2); +wildcard_5([accept], _) -> + true; +wildcard_5([star|Rest], File) -> + do_star(Rest, File); +wildcard_5([{one_of, Ordset}|Rest], [C|File]) -> + case ordsets:is_element(C, Ordset) of + true -> wildcard_5(Rest, File); + false -> false + end; +wildcard_5([{alt, Alts}], File) -> + do_alt(Alts, File); +wildcard_5([C|Rest1], [C|Rest2]) when is_integer(C) -> + wildcard_5(Rest1, Rest2); +wildcard_5([X|_], [Y|_]) when is_integer(X), is_integer(Y) -> + false; +wildcard_5([], []) -> + true; +wildcard_5([], [_|_]) -> + false; +wildcard_5([_|_], []) -> + false. + +do_star(Pattern, [X|Rest]) -> + case wildcard_5(Pattern, [X|Rest]) of + true -> true; + false -> do_star(Pattern, Rest) + end; +do_star(Pattern, []) -> + wildcard_5(Pattern, []). + +do_alt([Alt|Rest], File) -> + case wildcard_5(Alt, File) of + true -> true; + false -> do_alt(Rest, File) + end; +do_alt([], _File) -> + false. + +do_list_dir(current, Mod) -> eval_list_dir(".", Mod); +do_list_dir(Dir, Mod) -> eval_list_dir(Dir, Mod). + +join(current, File) -> File; +join(Base, File) -> filename:join(Base, File). + + +%%% Compiling a wildcard. + +compile_wildcard(Pattern) -> + ?HANDLE_ERROR(do_compile_wildcard(Pattern)). + +do_compile_wildcard(Pattern) -> + {compiled_wildcard,compile_wildcard_1(Pattern)}. + +compile_wildcard_1(Pattern) -> + [Root|Rest] = filename:split(Pattern), + case filename:pathtype(Root) of + relative -> + compile_wildcard_2([Root|Rest], current); + _ -> + compile_wildcard_2(Rest, [Root]) + end. + +compile_wildcard_2([Part|Rest], Root) -> + case compile_part(Part) of + Part -> + compile_wildcard_2(Rest, join(Root, Part)); + Pattern -> + compile_wildcard_3(Rest, [Pattern,Root]) + end; +compile_wildcard_2([], Root) -> {exists,Root}. + +compile_wildcard_3([Part|Rest], Result) -> + compile_wildcard_3(Rest, [compile_part(Part)|Result]); +compile_wildcard_3([], Result) -> + lists:reverse(Result). + +compile_part(Part) -> + compile_part(Part, false, []). + +compile_part_to_sep(Part) -> + compile_part(Part, true, []). + +compile_part([], true, _) -> + erlang:error({badpattern, missing_delimiter}); +compile_part([$,|Rest], true, Result) -> + {ok, $,, lists:reverse(Result), Rest}; +compile_part([$}|Rest], true, Result) -> + {ok, $}, lists:reverse(Result), Rest}; +compile_part([$?|Rest], Upto, Result) -> + compile_part(Rest, Upto, [question|Result]); +compile_part([$*], Upto, Result) -> + compile_part([], Upto, [accept|Result]); +compile_part([$*|Rest], Upto, Result) -> + compile_part(Rest, Upto, [star|Result]); +compile_part([$[|Rest], Upto, Result) -> + case compile_charset(Rest, ordsets:new()) of + {ok, Charset, Rest1} -> + compile_part(Rest1, Upto, [Charset|Result]); + error -> + compile_part(Rest, Upto, [$[|Result]) + end; +compile_part([${|Rest], Upto, Result) -> + case compile_alt(Rest) of + {ok, Alt} -> + lists:reverse(Result, [Alt]); + error -> + compile_part(Rest, Upto, [${|Result]) + end; +compile_part([X|Rest], Upto, Result) -> + compile_part(Rest, Upto, [X|Result]); +compile_part([], _Upto, Result) -> + lists:reverse(Result). + +compile_charset([$]|Rest], Ordset) -> + compile_charset1(Rest, ordsets:add_element($], Ordset)); +compile_charset([$-|Rest], Ordset) -> + compile_charset1(Rest, ordsets:add_element($-, Ordset)); +compile_charset([], _Ordset) -> + error; +compile_charset(List, Ordset) -> + compile_charset1(List, Ordset). + +compile_charset1([Lower, $-, Upper|Rest], Ordset) when Lower =< Upper -> + compile_charset1(Rest, compile_range(Lower, Upper, Ordset)); +compile_charset1([$]|Rest], Ordset) -> + {ok, {one_of, Ordset}, Rest}; +compile_charset1([X|Rest], Ordset) -> + compile_charset1(Rest, ordsets:add_element(X, Ordset)); +compile_charset1([], _Ordset) -> + error. + +compile_range(Lower, Current, Ordset) when Lower =< Current -> + compile_range(Lower, Current-1, ordsets:add_element(Current, Ordset)); +compile_range(_, _, Ordset) -> + Ordset. + +compile_alt(Pattern) -> + compile_alt(Pattern, []). + +compile_alt(Pattern, Result) -> + case compile_part_to_sep(Pattern) of + {ok, $,, AltPattern, Rest} -> + compile_alt(Rest, [AltPattern|Result]); + {ok, $}, AltPattern, Rest} -> + NewResult = [AltPattern|Result], + RestPattern = compile_part(Rest), + {ok, {alt, [Alt++RestPattern || Alt <- NewResult]}}; + Pattern -> + error + end. + +eval_read_file_info(File, file) -> + prim_file:read_file_info(File); +eval_read_file_info(File, erl_prim_loader) -> + case erl_prim_loader:read_file_info(File) of + error -> {error, erl_prim_loader}; + Res-> Res + end; +eval_read_file_info(File, Mod) -> + Mod:read_file_info(File). + +eval_list_dir(Dir, file) -> + prim_file:list_dir(Dir); +eval_list_dir(Dir, erl_prim_loader) -> + case erl_prim_loader:list_dir(Dir) of + error -> {error, erl_prim_loader}; + Res-> Res + end; +eval_list_dir(Dir, Mod) -> + Mod:list_dir(Dir). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0b39a209..77a9b5f2 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -526,7 +526,36 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). -read_term_file(File) -> file:consult(File). +read_term_file(File) -> + try + {ok, Data} = prim_file:read_file(File), + {ok, Tokens, _} = erl_scan:string(binary:bin_to_list(Data)), + TokenGroups = group_tokens(Tokens), + {ok, [begin + {ok, Term} = erl_parse:parse_term(Tokens1), + Term + end || Tokens1 <- TokenGroups]} + catch + error:{badmatch, Error} -> Error + end. + +group_tokens([]) -> + []; +group_tokens([{_, N, _} | _] = Tokens) -> + lists:reverse(group_tokens([], N, Tokens)); +group_tokens([{_, N} | _] = Tokens) -> + lists:reverse(group_tokens([], N, Tokens)). + +group_tokens(Cur, _, []) -> + [lists:reverse(Cur)]; +group_tokens(Cur, N, [Tok = {_, N} | Toks]) -> + group_tokens([Tok | Cur], N, Toks); +group_tokens(Cur, _, [{_, M} | _] = Toks) -> + [lists:reverse(Cur) | group_tokens([], M, Toks)]; +group_tokens(Cur, N, [Tok = {_, N, _} | Toks]) -> + group_tokens([Tok | Cur], N, Toks); +group_tokens(Cur, _, [{_, M, _} | _] = Toks) -> + [lists:reverse(Cur) | group_tokens([], M, Toks)]. write_term_file(File, Terms) -> write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || @@ -544,12 +573,12 @@ write_file(Path, Data, Modes) -> Modes1 = [binary, write | (Modes -- [binary, write])], case make_binary(Data) of Bin when is_binary(Bin) -> - case file:open(Path, Modes1) of - {ok, Hdl} -> try file:write(Hdl, Bin) of - ok -> file:sync(Hdl); + case prim_file:open(Path, Modes1) of + {ok, Hdl} -> try prim_file:write(Hdl, Bin) of + ok -> prim_file:sync(Hdl); {error, _} = E -> E after - file:close(Hdl) + prim_file:close(Hdl) end; {error, _} = E -> E end; @@ -567,7 +596,7 @@ make_binary(List) -> append_file(File, Suffix) -> - case file:read_file_info(File) of + case prim_file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); {error, enoent} -> append_file(File, 0, Suffix); Error -> Error @@ -576,18 +605,18 @@ append_file(File, Suffix) -> append_file(_, _, "") -> ok; append_file(File, 0, Suffix) -> - case file:open([File, Suffix], [append]) of - {ok, Fd} -> file:close(Fd); + case prim_file:open([File, Suffix], [append]) of + {ok, Fd} -> prim_file:close(Fd); Error -> Error end; append_file(File, _, Suffix) -> - case file:read_file(File) of + case prim_file:read_file(File) of {ok, Data} -> write_file([File, Suffix], Data, [append]); Error -> Error end. ensure_parent_dirs_exist(Filename) -> - case filelib:ensure_dir(Filename) of + case filelib2:ensure_dir(Filename) of ok -> ok; {error, Reason} -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) @@ -749,13 +778,13 @@ recursive_delete(Files) -> end, ok, Files). recursive_delete1(Path) -> - case filelib:is_dir(Path) and not(is_symlink(Path)) of - false -> case file:delete(Path) of + case filelib2:is_dir(Path) and not(is_symlink(Path)) of + false -> case prim_file:delete(Path) of ok -> ok; {error, enoent} -> ok; %% Path doesn't exist anyway {error, Err} -> {error, {Path, Err}} end; - true -> case file:list_dir(Path) of + true -> case prim_file:list_dir(Path) of {ok, FileNames} -> case lists:foldl( fun (FileName, ok) -> @@ -765,7 +794,7 @@ recursive_delete1(Path) -> Error end, ok, FileNames) of ok -> - case file:del_dir(Path) of + case prim_file:del_dir(Path) of ok -> ok; {error, Err} -> {error, {Path, Err}} end; @@ -784,15 +813,15 @@ is_symlink(Name) -> end. recursive_copy(Src, Dest) -> - case filelib:is_dir(Src) of - false -> case file:copy(Src, Dest) of + case filelib2:is_dir(Src) of + false -> case prim_file:copy(Src, Dest, infinity) of {ok, _Bytes} -> ok; {error, enoent} -> ok; %% Path doesn't exist anyway {error, Err} -> {error, {Src, Dest, Err}} end; - true -> case file:list_dir(Src) of + true -> case prim_file:list_dir(Src) of {ok, FileNames} -> - case file:make_dir(Dest) of + case prim_file:make_dir(Dest) of ok -> lists:foldl( fun (FileName, ok) -> @@ -902,10 +931,10 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> %% TODO: When we stop supporting Erlang prior to R14, this should be %% replaced with file:open [write, exclusive] lock_file(Path) -> - case filelib:is_file(Path) of + case filelib2:is_file(Path) of true -> {error, eexist}; - false -> {ok, Lock} = file:open(Path, [write]), - ok = file:close(Lock) + false -> {ok, Lock} = prim_file:open(Path, [write]), + ok = prim_file:close(Lock) end. const_ok() -> ok. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 636913b5..4461da42 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -229,7 +229,7 @@ init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), - false = filelib:is_file(Dir), %% is_file == is file or dir + false = filelib2:is_file(Dir), %% is_file == is file or dir State #qistate { on_sync = OnSyncFun }. shutdown_terms(Name) -> @@ -256,6 +256,7 @@ terminate(Terms, State) -> delete_and_terminate(State) -> {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + ok = file_handle_cache:obtain(), ok = rabbit_misc:recursive_delete([Dir]), State1. @@ -366,9 +367,9 @@ recover(DurableQueues) -> {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. all_queue_directory_names(Dir) -> - case file:list_dir(Dir) of + case prim_file:list_dir(Dir) of {ok, Entries} -> [ Entry || Entry <- Entries, - filelib:is_dir( + filelib2:is_dir( filename:join(Dir, Entry)) ]; {error, enoent} -> [] end. @@ -392,7 +393,7 @@ blank_state(QueueName) -> clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). detect_clean_shutdown(Dir) -> - case file:delete(clean_file_name(Dir)) of + case prim_file:delete(clean_file_name(Dir)) of ok -> true; {error, enoent} -> false end. @@ -402,7 +403,7 @@ read_shutdown_terms(Dir) -> store_clean_shutdown(Terms, Dir) -> CleanFileName = clean_file_name(Dir), - ok = filelib:ensure_dir(CleanFileName), + ok = filelib2:ensure_dir(CleanFileName), rabbit_misc:write_term_file(CleanFileName, Terms). init_clean(RecoveredCounts, State) -> @@ -603,8 +604,8 @@ flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> - case filelib:is_file(Path) of - true -> ok = file:delete(Path); + case filelib2:is_file(Path) of + true -> ok = prim_file:delete(Path); false -> ok end, SegmentsN; @@ -630,7 +631,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - ok = filelib:ensure_dir(Path), + ok = filelib2:ensure_dir(Path), {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -735,7 +736,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end, SegName)), Set) end, sets:from_list(segment_nums(Segments)), - filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). + filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of @@ -836,7 +837,7 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> - case filelib:is_file(Path) of + case filelib2:is_file(Path) of false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), @@ -1040,12 +1041,12 @@ foreach_queue_index(Funs) -> transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), [ok = transform_file(filename:join(Dir, Seg), SegmentFun) - || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + || Seg <- filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], ok = gatherer:finish(Gatherer). transform_file(Path, Fun) -> PathTmp = Path ++ ".upgrade", - case filelib:file_size(Path) of + case filelib2:file_size(Path) of 0 -> ok; Size -> {ok, PathTmpHdl} = file_handle_cache:open(PathTmp, ?WRITE_MODE, @@ -1059,7 +1060,7 @@ transform_file(Path, Fun) -> ok = drive_transform_fun(Fun, PathTmpHdl, Content), ok = file_handle_cache:close(PathTmpHdl), - ok = file:rename(PathTmp, Path) + ok = prim_file:rename(PathTmp, Path) end. drive_transform_fun(Fun, Hdl, Contents) -> diff --git a/src/serialiser.erl b/src/serialiser.erl new file mode 100644 index 00000000..0f9bcf17 --- /dev/null +++ b/src/serialiser.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% + +-module(serialiser). + +-behaviour(gen_server2). + +-export([start_link/0, submit/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(submit/2 :: + (pid() | atom(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {run, Fun}, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, nostate, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({run, Fun}, _From, State) -> + {reply, run(Fun), State, hibernate}; +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> apply(M, F, A); +run(Fun) -> Fun(). |