diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 10:34:38 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-13 10:34:38 +0100 |
commit | 78b203937fd180fe1665f149586c78e9033d5989 (patch) | |
tree | 952307bcd71f7378da50cfe3e1da8d050706be52 | |
parent | 6a5a56ea911e68f739f2759139c4dc0a93230cba (diff) | |
parent | 0902083d9e9b39ecbaf9c38a5c2861bcd54faa07 (diff) | |
download | rabbitmq-server-78b203937fd180fe1665f149586c78e9033d5989.tar.gz |
merge default into bug24386
-rw-r--r-- | src/file2.erl | 946 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 24 | ||||
-rw-r--r-- | src/filelib2.erl | 470 | ||||
-rw-r--r-- | src/io_runner.erl | 83 | ||||
-rw-r--r-- | src/rabbit.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 42 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 26 |
7 files changed, 1551 insertions, 46 deletions
diff --git a/src/file2.erl b/src/file2.erl new file mode 100644 index 00000000..d9ee1da8 --- /dev/null +++ b/src/file2.erl @@ -0,0 +1,946 @@ +%% This is a version of 'file' from R14B03, which runs calls to +%% file_server2 thorugh the worker_pool and uses the file_name/1 +%% function from R12B-5. Use this module when you expect a large +%% number of concurrent file operations. + +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(file2). + +%% Interface module for the file server and the file io servers. + + + +%%% External exports + +-export([format_error/1]). +%% File system and metadata. +-export([get_cwd/0, get_cwd/1, set_cwd/1, delete/1, rename/2, + make_dir/1, del_dir/1, list_dir/1, + read_file_info/1, write_file_info/2, + altname/1, + read_link_info/1, read_link/1, + make_link/2, make_symlink/2, + read_file/1, write_file/2, write_file/3]). +%% Specialized +-export([ipread_s32bu_p32bu/3]). +%% Generic file contents. +-export([open/2, close/1, advise/4, + read/2, write/2, + pread/2, pread/3, pwrite/2, pwrite/3, + read_line/1, + position/2, truncate/1, datasync/1, sync/1, + copy/2, copy/3]). +%% High level operations +-export([consult/1, path_consult/2]). +-export([eval/1, eval/2, path_eval/2, path_eval/3, path_open/3]). +-export([script/1, script/2, path_script/2, path_script/3]). +-export([change_owner/2, change_owner/3, change_group/2, + change_mode/2, change_time/2, change_time/3]). + +-export([pid2name/1]). + +%%% Obsolete exported functions + +-export([raw_read_file_info/1, raw_write_file_info/2]). + +%% Internal export to prim_file and ram_file until they implement +%% an efficient copy themselves. +-export([copy_opened/3]). + +-export([ipread_s32bu_p32bu_int/3]). + +%%% Includes and defines +-include_lib("kernel/include/file.hrl"). + +-define(FILE_IO_SERVER_TABLE, file_io_servers). + +-define(FILE_SERVER, file_server_2). % Registered name +-define(PRIM_FILE, prim_file). % Module +-define(RAM_FILE, ram_file). % Module + +%% data types +-type deep_list() :: [char() | atom() | deep_list()]. + +%%%----------------------------------------------------------------- +%%% General functions + +format_error({_Line, ?MODULE, undefined_script}) -> + "no value returned from script"; +format_error({Line, ?MODULE, {Class, Reason, Stacktrace}}) -> + io_lib:format("~w: evaluation failed with reason ~w:~w and stacktrace ~w", + [Line, Class, Reason, Stacktrace]); +format_error({Line, ?MODULE, {Reason, Stacktrace}}) -> + io_lib:format("~w: evaluation failed with reason ~w and stacktrace ~w", + [Line, Reason, Stacktrace]); +format_error({Line, Mod, Reason}) -> + io_lib:format("~w: ~s", [Line, Mod:format_error(Reason)]); +format_error(badarg) -> + "bad argument"; +format_error(system_limit) -> + "a system limit was hit, probably not enough ports"; +format_error(terminated) -> + "the file server process is terminated"; +format_error(ErrorId) -> + erl_posix_msg:message(ErrorId). + +pid2name(Pid) when is_pid(Pid) -> + case whereis(?FILE_SERVER) of + undefined -> + undefined; + _ -> + case ets:lookup(?FILE_IO_SERVER_TABLE, Pid) of + [{_, Name} | _] -> + {ok, Name}; + _ -> + undefined + end + end. + +%%%----------------------------------------------------------------- +%%% File server functions. +%%% Functions that do not operate on a single open file. +%%% Stateless. +get_cwd() -> + call(get_cwd, []). + +get_cwd(Drive) -> + check_and_call(get_cwd, [file_name(Drive)]). + +set_cwd(Dirname) -> + check_and_call(set_cwd, [file_name(Dirname)]). + +delete(Name) -> + check_and_call(delete, [file_name(Name)]). + +rename(From, To) -> + check_and_call(rename, [file_name(From), file_name(To)]). + +make_dir(Name) -> + check_and_call(make_dir, [file_name(Name)]). + +del_dir(Name) -> + check_and_call(del_dir, [file_name(Name)]). + +read_file_info(Name) -> + check_and_call(read_file_info, [file_name(Name)]). + +altname(Name) -> + check_and_call(altname, [file_name(Name)]). + +read_link_info(Name) -> + check_and_call(read_link_info, [file_name(Name)]). + +read_link(Name) -> + check_and_call(read_link, [file_name(Name)]). + +write_file_info(Name, Info = #file_info{}) -> + check_and_call(write_file_info, [file_name(Name), Info]). + +list_dir(Name) -> + check_and_call(list_dir, [file_name(Name)]). + +read_file(Name) -> + check_and_call(read_file, [file_name(Name)]). + +make_link(Old, New) -> + check_and_call(make_link, [file_name(Old), file_name(New)]). + +make_symlink(Old, New) -> + check_and_call(make_symlink, [file_name(Old), file_name(New)]). + +write_file(Name, Bin) -> + check_and_call(write_file, [file_name(Name), make_binary(Bin)]). + +%% This whole operation should be moved to the file_server and prim_file +%% when it is time to change file server protocol again. +%% Meanwhile, it is implemented here, slightly less efficient. + +write_file(Name, Bin, ModeList) when is_list(ModeList) -> + case make_binary(Bin) of + B when is_binary(B) -> + case open(Name, [binary, write | + lists:delete(binary, + lists:delete(write, ModeList))]) of + {ok, Handle} -> + case write(Handle, B) of + ok -> + close(Handle); + E1 -> + close(Handle), + E1 + end; + E2 -> + E2 + end; + E3 -> + E3 + end. + +%% Obsolete, undocumented, local node only, don't use!. +%% XXX to be removed. +raw_read_file_info(Name) -> + Args = [file_name(Name)], + case check_args(Args) of + ok -> + [FileName] = Args, + ?PRIM_FILE:read_file_info(FileName); + Error -> + Error + end. + +%% Obsolete, undocumented, local node only, don't use!. +%% XXX to be removed. +raw_write_file_info(Name, #file_info{} = Info) -> + Args = [file_name(Name)], + case check_args(Args) of + ok -> + [FileName] = Args, + ?PRIM_FILE:write_file_info(FileName, Info); + Error -> + Error + end. + +%%%----------------------------------------------------------------- +%%% File io server functions. +%%% They operate on a single open file. +%%% Stateful. + +%% Contemporary mode specification - list of options + +open(Item, ModeList) when is_list(ModeList) -> + case lists:member(raw, ModeList) of + %% Raw file, use ?PRIM_FILE to handle this file + true -> + %% check if raw file mode is disabled + case catch application:get_env(kernel, raw_files) of + {ok,false} -> + open(Item, lists:delete(raw, ModeList)); + _ -> % undefined | {ok,true} + Args = [file_name(Item) | ModeList], + case check_args(Args) of + ok -> + [FileName | _] = Args, + %% We rely on the returned Handle (in {ok, Handle}) + %% being a pid() or a #file_descriptor{} + ?PRIM_FILE:open(FileName, ModeList); + Error -> + Error + end + end; + false -> + case lists:member(ram, ModeList) of + %% RAM file, use ?RAM_FILE to handle this file + true -> + case check_args(ModeList) of + ok -> + ?RAM_FILE:open(Item, ModeList); + Error -> + Error + end; + %% File server file + false -> + Args = [file_name(Item) | ModeList], + case check_args(Args) of + ok -> + [FileName | _] = Args, + call(open, [FileName, ModeList]); + Error -> + Error + end + end + end; +%% Old obsolete mode specification in atom or 2-tuple format +open(Item, Mode) -> + open(Item, mode_list(Mode)). + +%%%----------------------------------------------------------------- +%%% The following interface functions operate on open files. +%%% The File argument must be either a Pid or a handle +%%% returned from ?PRIM_FILE:open. + +close(File) when is_pid(File) -> + R = file_request(File, close), + case wait_file_reply(File, R) of + {error, terminated} -> + ok; + Other -> + Other + end; +%% unlink(File), +%% exit(File, close), +%% ok; +close(#file_descriptor{module = Module} = Handle) -> + Module:close(Handle); +close(_) -> + {error, badarg}. + +advise(File, Offset, Length, Advise) when is_pid(File) -> + R = file_request(File, {advise, Offset, Length, Advise}), + wait_file_reply(File, R); +advise(#file_descriptor{module = Module} = Handle, Offset, Length, Advise) -> + Module:advise(Handle, Offset, Length, Advise); +advise(_, _, _, _) -> + {error, badarg}. + +read(File, Sz) when (is_pid(File) orelse is_atom(File)), is_integer(Sz), Sz >= 0 -> + case io:request(File, {get_chars, '', Sz}) of + Data when is_list(Data); is_binary(Data) -> + {ok, Data}; + Other -> + Other + end; +read(#file_descriptor{module = Module} = Handle, Sz) + when is_integer(Sz), Sz >= 0 -> + Module:read(Handle, Sz); +read(_, _) -> + {error, badarg}. + +read_line(File) when (is_pid(File) orelse is_atom(File)) -> + case io:request(File, {get_line, ''}) of + Data when is_list(Data); is_binary(Data) -> + {ok, Data}; + Other -> + Other + end; +read_line(#file_descriptor{module = Module} = Handle) -> + Module:read_line(Handle); +read_line(_) -> + {error, badarg}. + +pread(File, L) when is_pid(File), is_list(L) -> + pread_int(File, L, []); +pread(#file_descriptor{module = Module} = Handle, L) when is_list(L) -> + Module:pread(Handle, L); +pread(_, _) -> + {error, badarg}. + +pread_int(_File, [], R) -> + {ok, lists:reverse(R)}; +pread_int(File, [{At, Sz} | T], R) when is_integer(Sz), Sz >= 0 -> + case pread(File, At, Sz) of + {ok, Data} -> + pread_int(File, T, [Data | R]); + eof -> + pread_int(File, T, [eof | R]); + {error, _} = Error -> + Error + end; +pread_int(_, _, _) -> + {error, badarg}. + +pread(File, At, Sz) when is_pid(File), is_integer(Sz), Sz >= 0 -> + R = file_request(File, {pread, At, Sz}), + wait_file_reply(File, R); +pread(#file_descriptor{module = Module} = Handle, Offs, Sz) + when is_integer(Sz), Sz >= 0 -> + Module:pread(Handle, Offs, Sz); +pread(_, _, _) -> + {error, badarg}. + +write(File, Bytes) when (is_pid(File) orelse is_atom(File)) -> + case make_binary(Bytes) of + Bin when is_binary(Bin) -> + io:request(File, {put_chars,Bin}); + Error -> + Error + end; +write(#file_descriptor{module = Module} = Handle, Bytes) -> + Module:write(Handle, Bytes); +write(_, _) -> + {error, badarg}. + +pwrite(File, L) when is_pid(File), is_list(L) -> + pwrite_int(File, L, 0); +pwrite(#file_descriptor{module = Module} = Handle, L) when is_list(L) -> + Module:pwrite(Handle, L); +pwrite(_, _) -> + {error, badarg}. + +pwrite_int(_File, [], _R) -> + ok; +pwrite_int(File, [{At, Bytes} | T], R) -> + case pwrite(File, At, Bytes) of + ok -> + pwrite_int(File, T, R+1); + {error, Reason} -> + {error, {R, Reason}} + end; +pwrite_int(_, _, _) -> + {error, badarg}. + +pwrite(File, At, Bytes) when is_pid(File) -> + R = file_request(File, {pwrite, At, Bytes}), + wait_file_reply(File, R); +pwrite(#file_descriptor{module = Module} = Handle, Offs, Bytes) -> + Module:pwrite(Handle, Offs, Bytes); +pwrite(_, _, _) -> + {error, badarg}. + +datasync(File) when is_pid(File) -> + R = file_request(File, datasync), + wait_file_reply(File, R); +datasync(#file_descriptor{module = Module} = Handle) -> + Module:datasync(Handle); +datasync(_) -> + {error, badarg}. + +sync(File) when is_pid(File) -> + R = file_request(File, sync), + wait_file_reply(File, R); +sync(#file_descriptor{module = Module} = Handle) -> + Module:sync(Handle); +sync(_) -> + {error, badarg}. + +position(File, At) when is_pid(File) -> + R = file_request(File, {position,At}), + wait_file_reply(File, R); +position(#file_descriptor{module = Module} = Handle, At) -> + Module:position(Handle, At); +position(_, _) -> + {error, badarg}. + +truncate(File) when is_pid(File) -> + R = file_request(File, truncate), + wait_file_reply(File, R); +truncate(#file_descriptor{module = Module} = Handle) -> + Module:truncate(Handle); +truncate(_) -> + {error, badarg}. + +copy(Source, Dest) -> + copy_int(Source, Dest, infinity). + +copy(Source, Dest, Length) + when is_integer(Length), Length >= 0; + is_atom(Length) -> + copy_int(Source, Dest, Length); +copy(_, _, _) -> + {error, badarg}. + +%% Here we know that Length is either an atom or an integer >= 0 +%% (by the way, atoms > integers) +%% +%% Copy between open files. +copy_int(Source, Dest, Length) + when is_pid(Source), is_pid(Dest); + is_pid(Source), is_record(Dest, file_descriptor); + is_record(Source, file_descriptor), is_pid(Dest) -> + copy_opened_int(Source, Dest, Length, 0); +%% Copy between open raw files, both handled by the same module +copy_int(#file_descriptor{module = Module} = Source, + #file_descriptor{module = Module} = Dest, + Length) -> + Module:copy(Source, Dest, Length); +%% Copy between open raw files of different modules +copy_int(#file_descriptor{} = Source, + #file_descriptor{} = Dest, Length) -> + copy_opened_int(Source, Dest, Length, 0); +%% Copy between filenames, let the server do the copy +copy_int({SourceName, SourceOpts}, {DestName, DestOpts}, Length) + when is_list(SourceOpts), is_list(DestOpts) -> + check_and_call(copy, + [file_name(SourceName), SourceOpts, + file_name(DestName), DestOpts, + Length]); +%% Filename -> open file; must open Source and do client copy +copy_int({SourceName, SourceOpts}, Dest, Length) + when is_list(SourceOpts), is_pid(Dest); + is_list(SourceOpts), is_record(Dest, file_descriptor) -> + case file_name(SourceName) of + {error, _} = Error -> + Error; + Source -> + case open(Source, [read | SourceOpts]) of + {ok, Handle} -> + Result = copy_opened_int(Handle, Dest, Length, 0), + close(Handle), + Result; + {error, _} = Error -> + Error + end + end; +%% Open file -> filename; must open Dest and do client copy +copy_int(Source, {DestName, DestOpts}, Length) + when is_pid(Source), is_list(DestOpts); + is_record(Source, file_descriptor), is_list(DestOpts) -> + case file_name(DestName) of + {error, _} = Error -> + Error; + Dest -> + case open(Dest, [write | DestOpts]) of + {ok, Handle} -> + Result = copy_opened_int(Source, Handle, Length, 0), + close(Handle), + Result; + {error, _} = Error -> + Error + end + end; +%% +%% That was all combinations of {Name, Opts} tuples +%% and open files. At least one of Source and Dest has +%% to be a bare filename. +%% +%% If Source is not a bare filename; Dest must be +copy_int(Source, Dest, Length) + when is_pid(Source); + is_record(Source, file_descriptor) -> + copy_int(Source, {Dest, []}, Length); +copy_int({_SourceName, SourceOpts} = Source, Dest, Length) + when is_list(SourceOpts) -> + copy_int(Source, {Dest, []}, Length); +%% If Dest is not a bare filename; Source must be +copy_int(Source, Dest, Length) + when is_pid(Dest); + is_record(Dest, file_descriptor) -> + copy_int({Source, []}, Dest, Length); +copy_int(Source, {_DestName, DestOpts} = Dest, Length) + when is_list(DestOpts) -> + copy_int({Source, []}, Dest, Length); +%% Both must be bare filenames. If they are not, +%% the filename check in the copy operation will yell. +copy_int(Source, Dest, Length) -> + copy_int({Source, []}, {Dest, []}, Length). + + + +copy_opened(Source, Dest, Length) + when is_integer(Length), Length >= 0; + is_atom(Length) -> + copy_opened_int(Source, Dest, Length); +copy_opened(_, _, _) -> + {error, badarg}. + +%% Here we know that Length is either an atom or an integer >= 0 +%% (by the way, atoms > integers) + +copy_opened_int(Source, Dest, Length) + when is_pid(Source), is_pid(Dest) -> + copy_opened_int(Source, Dest, Length, 0); +copy_opened_int(Source, Dest, Length) + when is_pid(Source), is_record(Dest, file_descriptor) -> + copy_opened_int(Source, Dest, Length, 0); +copy_opened_int(Source, Dest, Length) + when is_record(Source, file_descriptor), is_pid(Dest) -> + copy_opened_int(Source, Dest, Length, 0); +copy_opened_int(Source, Dest, Length) + when is_record(Source, file_descriptor), is_record(Dest, file_descriptor) -> + copy_opened_int(Source, Dest, Length, 0); +copy_opened_int(_, _, _) -> + {error, badarg}. + +%% Here we know that Source and Dest are handles to open files, Length is +%% as above, and Copied is an integer >= 0 + +%% Copy loop in client process +copy_opened_int(_, _, Length, Copied) when Length =< 0 -> % atom() > integer() + {ok, Copied}; +copy_opened_int(Source, Dest, Length, Copied) -> + N = if Length > 65536 -> 65536; true -> Length end, % atom() > integer() ! + case read(Source, N) of + {ok, Data} -> + M = if is_binary(Data) -> byte_size(Data); + is_list(Data) -> length(Data) + end, + case write(Dest, Data) of + ok -> + if M < N -> + %% Got less than asked for - must be end of file + {ok, Copied+M}; + true -> + %% Decrement Length (might be an atom (infinity)) + NewLength = if is_atom(Length) -> Length; + true -> Length-M + end, + copy_opened_int(Source, Dest, NewLength, Copied+M) + end; + {error, _} = Error -> + Error + end; + eof -> + {ok, Copied}; + {error, _} = Error -> + Error + end. + + +%% Special indirect pread function. Introduced for Dets. +%% Reads a header from pos 'Pos', the header is first a size encoded as +%% 32 bit big endian unsigned and then a position also encoded as +%% 32 bit big endian. Finally it preads the data from that pos and size +%% in the file. + +ipread_s32bu_p32bu(File, Pos, MaxSize) when is_pid(File) -> + ipread_s32bu_p32bu_int(File, Pos, MaxSize); +ipread_s32bu_p32bu(#file_descriptor{module = Module} = Handle, Pos, MaxSize) -> + Module:ipread_s32bu_p32bu(Handle, Pos, MaxSize); +ipread_s32bu_p32bu(_, _, _) -> + {error, badarg}. + +ipread_s32bu_p32bu_int(File, Pos, Infinity) when is_atom(Infinity) -> + ipread_s32bu_p32bu_int(File, Pos, (1 bsl 31)-1); +ipread_s32bu_p32bu_int(File, Pos, MaxSize) + when is_integer(MaxSize), MaxSize >= 0 -> + if + MaxSize < (1 bsl 31) -> + case pread(File, Pos, 8) of + {ok, Header} -> + ipread_s32bu_p32bu_2(File, Header, MaxSize); + Error -> + Error + end; + true -> + {error, einval} + end; +ipread_s32bu_p32bu_int(_File, _Pos, _MaxSize) -> + {error, badarg}. + +ipread_s32bu_p32bu_2(_File, + <<0:32/big-unsigned, Pos:32/big-unsigned>>, + _MaxSize) -> + {ok, {0, Pos, eof}}; +ipread_s32bu_p32bu_2(File, + <<Size:32/big-unsigned, Pos:32/big-unsigned>>, + MaxSize) + when Size =< MaxSize -> + case pread(File, Pos, Size) of + {ok, Data} -> + {ok, {Size, Pos, Data}}; + eof -> + {ok, {Size, Pos, eof}}; + Error -> + Error + end; +ipread_s32bu_p32bu_2(_File, + <<_:8/binary>>, + _MaxSize) -> + eof; +ipread_s32bu_p32bu_2(_File, + <<_/binary>>, + _MaxSize) -> + eof; +ipread_s32bu_p32bu_2(File, + Header, + MaxSize) when is_list(Header) -> + ipread_s32bu_p32bu_2(File, list_to_binary(Header), MaxSize). + + + +%%%----------------------------------------------------------------- +%%% The following functions, built upon the other interface functions, +%%% provide a higher-lever interface to files. + +consult(File) -> + case open(File, [read]) of + {ok, Fd} -> + R = consult_stream(Fd), + close(Fd), + R; + Error -> + Error + end. + +path_consult(Path, File) -> + case path_open(Path, File, [read]) of + {ok, Fd, Full} -> + case consult_stream(Fd) of + {ok, List} -> + close(Fd), + {ok, List, Full}; + E1 -> + close(Fd), + E1 + end; + E2 -> + E2 + end. + +eval(File) -> + eval(File, erl_eval:new_bindings()). + +eval(File, Bs) -> + case open(File, [read]) of + {ok, Fd} -> + R = eval_stream(Fd, ignore, Bs), + close(Fd), + R; + Error -> + Error + end. + +path_eval(Path, File) -> + path_eval(Path, File, erl_eval:new_bindings()). + +path_eval(Path, File, Bs) -> + case path_open(Path, File, [read]) of + {ok, Fd, Full} -> + case eval_stream(Fd, ignore, Bs) of + ok -> + close(Fd), + {ok, Full}; + E1 -> + close(Fd), + E1 + end; + E2 -> + E2 + end. + +script(File) -> + script(File, erl_eval:new_bindings()). + +script(File, Bs) -> + case open(File, [read]) of + {ok, Fd} -> + R = eval_stream(Fd, return, Bs), + close(Fd), + R; + Error -> + Error + end. + +path_script(Path, File) -> + path_script(Path, File, erl_eval:new_bindings()). + +path_script(Path, File, Bs) -> + case path_open(Path, File, [read]) of + {ok,Fd,Full} -> + case eval_stream(Fd, return, Bs) of + {ok,R} -> + close(Fd), + {ok, R, Full}; + E1 -> + close(Fd), + E1 + end; + E2 -> + E2 + end. + + +%% path_open(Paths, Filename, Mode) -> +%% {ok,FileDescriptor,FullName} +%% {error,Reason} +%% +%% Searches the Paths for file Filename which can be opened with Mode. +%% The path list is ignored if Filename contains an absolute path. + +path_open(PathList, Name, Mode) -> + case file_name(Name) of + {error, _} = Error -> + Error; + FileName -> + case filename:pathtype(FileName) of + relative -> + path_open_first(PathList, FileName, Mode, enoent); + _ -> + case open(Name, Mode) of + {ok, Fd} -> + {ok, Fd, Name}; + Error -> + Error + end + end + end. + +change_mode(Name, Mode) + when is_integer(Mode) -> + write_file_info(Name, #file_info{mode=Mode}). + +change_owner(Name, OwnerId) + when is_integer(OwnerId) -> + write_file_info(Name, #file_info{uid=OwnerId}). + +change_owner(Name, OwnerId, GroupId) + when is_integer(OwnerId), is_integer(GroupId) -> + write_file_info(Name, #file_info{uid=OwnerId, gid=GroupId}). + +change_group(Name, GroupId) + when is_integer(GroupId) -> + write_file_info(Name, #file_info{gid=GroupId}). + +change_time(Name, Time) + when is_tuple(Time) -> + write_file_info(Name, #file_info{mtime=Time}). + +change_time(Name, Atime, Mtime) + when is_tuple(Atime), is_tuple(Mtime) -> + write_file_info(Name, #file_info{atime=Atime, mtime=Mtime}). + +%%%----------------------------------------------------------------- +%%% Helpers + +consult_stream(Fd) -> + consult_stream(Fd, 1, []). + +consult_stream(Fd, Line, Acc) -> + case io:read(Fd, '', Line) of + {ok,Term,EndLine} -> + consult_stream(Fd, EndLine, [Term|Acc]); + {error,Error,_Line} -> + {error,Error}; + {eof,_Line} -> + {ok,lists:reverse(Acc)} + end. + +eval_stream(Fd, Handling, Bs) -> + eval_stream(Fd, Handling, 1, undefined, [], Bs). + +eval_stream(Fd, H, Line, Last, E, Bs) -> + eval_stream2(io:parse_erl_exprs(Fd, '', Line), Fd, H, Last, E, Bs). + +eval_stream2({ok,Form,EndLine}, Fd, H, Last, E, Bs0) -> + try erl_eval:exprs(Form, Bs0) of + {value,V,Bs} -> + eval_stream(Fd, H, EndLine, {V}, E, Bs) + catch Class:Reason -> + Error = {EndLine,?MODULE,{Class,Reason,erlang:get_stacktrace()}}, + eval_stream(Fd, H, EndLine, Last, [Error|E], Bs0) + end; +eval_stream2({error,What,EndLine}, Fd, H, Last, E, Bs) -> + eval_stream(Fd, H, EndLine, Last, [What | E], Bs); +eval_stream2({eof,EndLine}, _Fd, H, Last, E, _Bs) -> + case {H, Last, E} of + {return, {Val}, []} -> + {ok, Val}; + {return, undefined, E} -> + {error, hd(lists:reverse(E, [{EndLine,?MODULE,undefined_script}]))}; + {ignore, _, []} -> + ok; + {_, _, [_|_] = E} -> + {error, hd(lists:reverse(E))} + end. + +path_open_first([Path|Rest], Name, Mode, LastError) -> + case file_name(Path) of + {error, _} = Error -> + Error; + FilePath -> + FileName = filename:join(FilePath, Name), + case open(FileName, Mode) of + {ok, Fd} -> + {ok, Fd, FileName}; + {error, enoent} -> + path_open_first(Rest, Name, Mode, LastError); + Error -> + Error + end + end; +path_open_first([], _Name, _Mode, LastError) -> + {error, LastError}. + +%%%----------------------------------------------------------------- +%%% Utility functions. + +%% file_name(FileName) +%% Generates a flat file name from a deep list of atoms and +%% characters (integers). + +file_name(N) -> + try + file_name_1(N) + catch Reason -> + {error, Reason} + end. + +file_name_1([C|T]) when is_integer(C), C > 0, C =< 255 -> + [C|file_name_1(T)]; +file_name_1([H|T]) -> + file_name_1(H) ++ file_name_1(T); +file_name_1([]) -> + []; +file_name_1(N) when is_atom(N) -> + atom_to_list(N); +file_name_1(_) -> + throw(badarg). + +make_binary(Bin) when is_binary(Bin) -> + Bin; +make_binary(List) -> + %% Convert the list to a binary in order to avoid copying a list + %% to the file server. + try + erlang:iolist_to_binary(List) + catch error:Reason -> + {error, Reason} + end. + +mode_list(read) -> + [read]; +mode_list(write) -> + [write]; +mode_list(read_write) -> + [read, write]; +mode_list({binary, Mode}) when is_atom(Mode) -> + [binary | mode_list(Mode)]; +mode_list({character, Mode}) when is_atom(Mode) -> + mode_list(Mode); +mode_list(_) -> + [{error, badarg}]. + +%%----------------------------------------------------------------- +%% Functions for communicating with the file server + +call(Command, Args) when is_list(Args) -> + io_runner:submit( + fun () -> + gen_server:call(?FILE_SERVER, list_to_tuple([Command | Args]), + infinity) + end). + +check_and_call(Command, Args) when is_list(Args) -> + case check_args(Args) of + ok -> + call(Command, Args); + Error -> + Error + end. + +check_args([{error, _}=Error|_Rest]) -> + Error; +check_args([_Name|Rest]) -> + check_args(Rest); +check_args([]) -> + ok. + +%%----------------------------------------------------------------- +%% Functions for communicating with a file io server. +%% The messages sent have the following formats: +%% +%% {file_request,From,ReplyAs,Request} +%% {file_reply,ReplyAs,Reply} + +file_request(Io, Request) -> + R = erlang:monitor(process, Io), + Io ! {file_request,self(),Io,Request}, + R. + +wait_file_reply(From, Ref) -> + receive + {file_reply,From,Reply} -> + erlang:demonitor(Ref), + receive {'DOWN', Ref, _, _, _} -> ok after 0 -> ok end, + %% receive {'EXIT', From, _} -> ok after 0 -> ok end, + Reply; + {'DOWN', Ref, _, _, _} -> + %% receive {'EXIT', From, _} -> ok after 0 -> ok end, + {error, terminated} + end. diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3c2111dc..39be0e9b 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -317,7 +317,7 @@ read(Ref, Count) -> fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case file:read(Hdl, Count) of + case file2:read(Hdl, Count) of {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), {Obj, [Handle #handle { offset = Offset1 }]}; @@ -337,7 +337,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {file:write(Hdl, Data), + {file2:write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -364,7 +364,7 @@ sync(Ref) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case file:sync(Hdl) of + case file2:sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -381,7 +381,7 @@ truncate(Ref) -> with_flushed_handles( [Ref], fun ([Handle1 = #handle { hdl = Hdl }]) -> - case file:truncate(Hdl) of + case file2:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end @@ -408,7 +408,7 @@ copy(Src, Dest, Count) -> fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] ) -> - case file:copy(SHdl, DHdl, Count) of + case file2:copy(SHdl, DHdl, Count) of {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, @@ -428,7 +428,7 @@ delete(Ref) -> Handle = #handle { path = Path } -> case hard_close(Handle #handle { is_dirty = false, write_buffer = [] }) of - ok -> file:delete(Path); + ok -> file2:delete(Path); {Error, Handle1} -> put_handle(Ref, Handle1), Error end @@ -443,7 +443,7 @@ clear(Ref) -> case maybe_seek(bof, Handle #handle { write_buffer = [], write_buffer_size = 0 }) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> - case file:truncate(Hdl) of + case file2:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end; @@ -566,7 +566,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case file:open(Path, case NewOrReopen of + case file2:open(Path, case NewOrReopen of new -> Mode; reopen -> [read | Mode] end) of @@ -693,10 +693,10 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> file:sync(Hdl); + true -> file2:sync(Hdl); false -> ok end, - ok = file:close(Hdl), + ok = file2:close(Hdl), age_tree_delete(Then), {ok, Handle1 #handle { hdl = closed, is_dirty = false, @@ -731,7 +731,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, at_eof = AtEoF }) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), case (case NeedsSeek of - true -> file:position(Hdl, NewOffset); + true -> file2:position(Hdl, NewOffset); false -> {ok, Offset} end) of {ok, Offset1} = Result -> @@ -768,7 +768,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case file:write(Hdl, lists:reverse(WriteBuffer)) of + case file2:write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, diff --git a/src/filelib2.erl b/src/filelib2.erl new file mode 100644 index 00000000..8fff0402 --- /dev/null +++ b/src/filelib2.erl @@ -0,0 +1,470 @@ +%% This is a version of 'filelib' from R14B03, which uses 'file2' +%% instead of 'file'. Use this module when you expect a large number +%% of concurrent file operations. + +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% + +-module(filelib2). + +%% File utilities. + +%% Avoid warning for local function error/1 clashing with autoimported BIF. +-compile({no_auto_import,[error/1]}). +-export([wildcard/1, wildcard/2, is_dir/1, is_file/1, is_regular/1, + compile_wildcard/1]). +-export([fold_files/5, last_modified/1, file_size/1, ensure_dir/1]). + +-export([wildcard/3, is_dir/2, is_file/2, is_regular/2]). +-export([fold_files/6, last_modified/2, file_size/2]). + +-include_lib("kernel/include/file.hrl"). + +-define(HANDLE_ERROR(Expr), + try + Expr + catch + error:{badpattern,_}=UnUsUalVaRiAbLeNaMe -> + %% Get the stack backtrace correct. + erlang:error(UnUsUalVaRiAbLeNaMe) + end). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +wildcard(Pattern) when is_list(Pattern) -> + ?HANDLE_ERROR(do_wildcard(Pattern, file)). + +wildcard(Pattern, Cwd) when is_list(Pattern), (is_list(Cwd) or is_binary(Cwd)) -> + ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, file)); +wildcard(Pattern, Mod) when is_list(Pattern), is_atom(Mod) -> + ?HANDLE_ERROR(do_wildcard(Pattern, Mod)). + +-spec wildcard(file:name(), file:name(), atom()) -> [file:filename()]. +wildcard(Pattern, Cwd, Mod) + when is_list(Pattern), (is_list(Cwd) or is_binary(Cwd)), is_atom(Mod) -> + ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, Mod)). + +is_dir(Dir) -> + do_is_dir(Dir, file). + +is_dir(Dir, Mod) when is_atom(Mod) -> + do_is_dir(Dir, Mod). + +is_file(File) -> + do_is_file(File, file). + +is_file(File, Mod) when is_atom(Mod) -> + do_is_file(File, Mod). + +is_regular(File) -> + do_is_regular(File, file). + +is_regular(File, Mod) when is_atom(Mod) -> + do_is_regular(File, Mod). + +fold_files(Dir, RegExp, Recursive, Fun, Acc) -> + do_fold_files(Dir, RegExp, Recursive, Fun, Acc, file). + +fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) when is_atom(Mod) -> + do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod). + +last_modified(File) -> + do_last_modified(File, file). + +-spec last_modified(file:name(), atom()) -> file:date_time() | 0. +last_modified(File, Mod) when is_atom(Mod) -> + do_last_modified(File, Mod). + +file_size(File) -> + do_file_size(File, file). + +-spec file_size(file:name(), atom()) -> non_neg_integer(). +file_size(File, Mod) when is_atom(Mod) -> + do_file_size(File, Mod). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +do_wildcard(Pattern, Mod) when is_list(Pattern) -> + do_wildcard_comp(do_compile_wildcard(Pattern), Mod). + +do_wildcard_comp({compiled_wildcard,{exists,File}}, Mod) -> + case eval_read_file_info(File, Mod) of + {ok,_} -> [File]; + _ -> [] + end; +do_wildcard_comp({compiled_wildcard,[Base|Rest]}, Mod) -> + do_wildcard_1([Base], Rest, Mod). + +do_wildcard(Pattern, Cwd, Mod) when is_list(Pattern), (is_list(Cwd) or is_binary(Cwd)) -> + do_wildcard_comp(do_compile_wildcard(Pattern), Cwd, Mod). + +do_wildcard_comp({compiled_wildcard,{exists,File}}, Cwd, Mod) -> + case eval_read_file_info(filename:absname(File, Cwd), Mod) of + {ok,_} -> [File]; + _ -> [] + end; +do_wildcard_comp({compiled_wildcard,[current|Rest]}, Cwd0, Mod) -> + {Cwd,PrefixLen} = case filename:join([Cwd0]) of + Bin when is_binary(Bin) -> {Bin,byte_size(Bin)+1}; + Other -> {Other,length(Other)+1} + end, %Slash away redundant slashes. + [ + if + is_binary(N) -> + <<_:PrefixLen/binary,Res/binary>> = N, + Res; + true -> + lists:nthtail(PrefixLen, N) + end || N <- do_wildcard_1([Cwd], Rest, Mod)]; +do_wildcard_comp({compiled_wildcard,[Base|Rest]}, _Cwd, Mod) -> + do_wildcard_1([Base], Rest, Mod). + +do_is_dir(Dir, Mod) -> + case eval_read_file_info(Dir, Mod) of + {ok, #file_info{type=directory}} -> + true; + _ -> + false + end. + +do_is_file(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, #file_info{type=regular}} -> + true; + {ok, #file_info{type=directory}} -> + true; + _ -> + false + end. + +do_is_regular(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, #file_info{type=regular}} -> + true; + _ -> + false + end. + +%% fold_files(Dir, RegExp, Recursive, Fun, AccIn). + +%% folds the function Fun(F, Acc) -> Acc1 over +%% all files <F> in <Dir> that match the regular expression <RegExp> +%% If <Recursive> is true all sub-directories to <Dir> are processed + +do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) -> + {ok, Re1} = re:compile(RegExp,[unicode]), + do_fold_files1(Dir, Re1, RegExp, Recursive, Fun, Acc, Mod). + +do_fold_files1(Dir, RegExp, OrigRE, Recursive, Fun, Acc, Mod) -> + case eval_list_dir(Dir, Mod) of + {ok, Files} -> do_fold_files2(Files, Dir, RegExp, OrigRE, + Recursive, Fun, Acc, Mod); + {error, _} -> Acc + end. + +%% OrigRE is not to be compiled as it's for non conforming filenames, +%% i.e. for filenames that does not comply to the current encoding, which should +%% be very rare. We use it only in those cases and do not want to precompile. +do_fold_files2([], _Dir, _RegExp, _OrigRE, _Recursive, _Fun, Acc, _Mod) -> + Acc; +do_fold_files2([File|T], Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod) -> + FullName = filename:join(Dir, File), + case do_is_regular(FullName, Mod) of + true -> + case (catch re:run(File, if is_binary(File) -> OrigRE; + true -> RegExp end, + [{capture,none}])) of + match -> + Acc = Fun(FullName, Acc0), + do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc, Mod); + {'EXIT',_} -> + do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod); + nomatch -> + do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod) + end; + false -> + case Recursive andalso do_is_dir(FullName, Mod) of + true -> + Acc1 = do_fold_files1(FullName, RegExp, OrigRE, Recursive, + Fun, Acc0, Mod), + do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc1, Mod); + false -> + do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod) + end + end. + +do_last_modified(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, Info} -> + Info#file_info.mtime; + _ -> + 0 + end. + +do_file_size(File, Mod) -> + case eval_read_file_info(File, Mod) of + {ok, Info} -> + Info#file_info.size; + _ -> + 0 + end. + +%%---------------------------------------------------------------------- +%% +type ensure_dir(X) -> ok | {error, Reason}. +%% +type X = filename() | dirname() +%% ensures that the directory name required to create D exists + +ensure_dir("/") -> + ok; +ensure_dir(F) -> + Dir = filename:dirname(F), + case do_is_dir(Dir, file) of + true -> + ok; + false -> + ensure_dir(Dir), + case file2:make_dir(Dir) of + {error,eexist}=EExist -> + case do_is_dir(Dir, file) of + true -> + ok; + false -> + EExist + end; + Err -> + Err + end + end. + + +%%% +%%% Pattern matching using a compiled wildcard. +%%% + +do_wildcard_1(Files, Pattern, Mod) -> + do_wildcard_2(Files, Pattern, [], Mod). + +do_wildcard_2([File|Rest], Pattern, Result, Mod) -> + do_wildcard_2(Rest, Pattern, do_wildcard_3(File, Pattern, Result, Mod), Mod); +do_wildcard_2([], _, Result, _Mod) -> + Result. + +do_wildcard_3(Base, [Pattern|Rest], Result, Mod) -> + case do_list_dir(Base, Mod) of + {ok, Files0} -> + Files = lists:sort(Files0), + Matches = wildcard_4(Pattern, Files, Base, []), + do_wildcard_2(Matches, Rest, Result, Mod); + _ -> + Result + end; +do_wildcard_3(Base, [], Result, _Mod) -> + [Base|Result]. + +wildcard_4(Pattern, [File|Rest], Base, Result) when is_binary(File) -> + case wildcard_5(Pattern, binary_to_list(File)) of + true -> + wildcard_4(Pattern, Rest, Base, [join(Base, File)|Result]); + false -> + wildcard_4(Pattern, Rest, Base, Result) + end; +wildcard_4(Pattern, [File|Rest], Base, Result) -> + case wildcard_5(Pattern, File) of + true -> + wildcard_4(Pattern, Rest, Base, [join(Base, File)|Result]); + false -> + wildcard_4(Pattern, Rest, Base, Result) + end; +wildcard_4(_Patt, [], _Base, Result) -> + Result. + +wildcard_5([question|Rest1], [_|Rest2]) -> + wildcard_5(Rest1, Rest2); +wildcard_5([accept], _) -> + true; +wildcard_5([star|Rest], File) -> + do_star(Rest, File); +wildcard_5([{one_of, Ordset}|Rest], [C|File]) -> + case ordsets:is_element(C, Ordset) of + true -> wildcard_5(Rest, File); + false -> false + end; +wildcard_5([{alt, Alts}], File) -> + do_alt(Alts, File); +wildcard_5([C|Rest1], [C|Rest2]) when is_integer(C) -> + wildcard_5(Rest1, Rest2); +wildcard_5([X|_], [Y|_]) when is_integer(X), is_integer(Y) -> + false; +wildcard_5([], []) -> + true; +wildcard_5([], [_|_]) -> + false; +wildcard_5([_|_], []) -> + false. + +do_star(Pattern, [X|Rest]) -> + case wildcard_5(Pattern, [X|Rest]) of + true -> true; + false -> do_star(Pattern, Rest) + end; +do_star(Pattern, []) -> + wildcard_5(Pattern, []). + +do_alt([Alt|Rest], File) -> + case wildcard_5(Alt, File) of + true -> true; + false -> do_alt(Rest, File) + end; +do_alt([], _File) -> + false. + +do_list_dir(current, Mod) -> eval_list_dir(".", Mod); +do_list_dir(Dir, Mod) -> eval_list_dir(Dir, Mod). + +join(current, File) -> File; +join(Base, File) -> filename:join(Base, File). + + +%%% Compiling a wildcard. + +compile_wildcard(Pattern) -> + ?HANDLE_ERROR(do_compile_wildcard(Pattern)). + +do_compile_wildcard(Pattern) -> + {compiled_wildcard,compile_wildcard_1(Pattern)}. + +compile_wildcard_1(Pattern) -> + [Root|Rest] = filename:split(Pattern), + case filename:pathtype(Root) of + relative -> + compile_wildcard_2([Root|Rest], current); + _ -> + compile_wildcard_2(Rest, [Root]) + end. + +compile_wildcard_2([Part|Rest], Root) -> + case compile_part(Part) of + Part -> + compile_wildcard_2(Rest, join(Root, Part)); + Pattern -> + compile_wildcard_3(Rest, [Pattern,Root]) + end; +compile_wildcard_2([], Root) -> {exists,Root}. + +compile_wildcard_3([Part|Rest], Result) -> + compile_wildcard_3(Rest, [compile_part(Part)|Result]); +compile_wildcard_3([], Result) -> + lists:reverse(Result). + +compile_part(Part) -> + compile_part(Part, false, []). + +compile_part_to_sep(Part) -> + compile_part(Part, true, []). + +compile_part([], true, _) -> + error(missing_delimiter); +compile_part([$,|Rest], true, Result) -> + {ok, $,, lists:reverse(Result), Rest}; +compile_part([$}|Rest], true, Result) -> + {ok, $}, lists:reverse(Result), Rest}; +compile_part([$?|Rest], Upto, Result) -> + compile_part(Rest, Upto, [question|Result]); +compile_part([$*], Upto, Result) -> + compile_part([], Upto, [accept|Result]); +compile_part([$*|Rest], Upto, Result) -> + compile_part(Rest, Upto, [star|Result]); +compile_part([$[|Rest], Upto, Result) -> + case compile_charset(Rest, ordsets:new()) of + {ok, Charset, Rest1} -> + compile_part(Rest1, Upto, [Charset|Result]); + error -> + compile_part(Rest, Upto, [$[|Result]) + end; +compile_part([${|Rest], Upto, Result) -> + case compile_alt(Rest) of + {ok, Alt} -> + lists:reverse(Result, [Alt]); + error -> + compile_part(Rest, Upto, [${|Result]) + end; +compile_part([X|Rest], Upto, Result) -> + compile_part(Rest, Upto, [X|Result]); +compile_part([], _Upto, Result) -> + lists:reverse(Result). + +compile_charset([$]|Rest], Ordset) -> + compile_charset1(Rest, ordsets:add_element($], Ordset)); +compile_charset([$-|Rest], Ordset) -> + compile_charset1(Rest, ordsets:add_element($-, Ordset)); +compile_charset([], _Ordset) -> + error; +compile_charset(List, Ordset) -> + compile_charset1(List, Ordset). + +compile_charset1([Lower, $-, Upper|Rest], Ordset) when Lower =< Upper -> + compile_charset1(Rest, compile_range(Lower, Upper, Ordset)); +compile_charset1([$]|Rest], Ordset) -> + {ok, {one_of, Ordset}, Rest}; +compile_charset1([X|Rest], Ordset) -> + compile_charset1(Rest, ordsets:add_element(X, Ordset)); +compile_charset1([], _Ordset) -> + error. + +compile_range(Lower, Current, Ordset) when Lower =< Current -> + compile_range(Lower, Current-1, ordsets:add_element(Current, Ordset)); +compile_range(_, _, Ordset) -> + Ordset. + +compile_alt(Pattern) -> + compile_alt(Pattern, []). + +compile_alt(Pattern, Result) -> + case compile_part_to_sep(Pattern) of + {ok, $,, AltPattern, Rest} -> + compile_alt(Rest, [AltPattern|Result]); + {ok, $}, AltPattern, Rest} -> + NewResult = [AltPattern|Result], + RestPattern = compile_part(Rest), + {ok, {alt, [Alt++RestPattern || Alt <- NewResult]}}; + Pattern -> + error + end. + +error(Reason) -> + erlang:error({badpattern,Reason}). + +eval_read_file_info(File, file) -> + file2:read_file_info(File); +eval_read_file_info(File, erl_prim_loader) -> + case erl_prim_loader:read_file_info(File) of + error -> {error, erl_prim_loader}; + Res-> Res + end; +eval_read_file_info(File, Mod) -> + Mod:read_file_info(File). + +eval_list_dir(Dir, file) -> + file2:list_dir(Dir); +eval_list_dir(Dir, erl_prim_loader) -> + case erl_prim_loader:list_dir(Dir) of + error -> {error, erl_prim_loader}; + Res-> Res + end; +eval_list_dir(Dir, Mod) -> + Mod:list_dir(Dir). diff --git a/src/io_runner.erl b/src/io_runner.erl new file mode 100644 index 00000000..8a252158 --- /dev/null +++ b/src/io_runner.erl @@ -0,0 +1,83 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% + +-module(io_runner). + +-behaviour(gen_server2). + +-export([start_link/0, submit/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + %% If the io_runner is not running, just run the Fun in the + %% current process. + case whereis(?SERVER) of + undefined -> run(Fun); + _ -> gen_server2:call(?SERVER, {run, Fun}, infinity) + end. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, nostate, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({run, Fun}, _From, State) -> + {reply, run(Fun), State, hibernate}; +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). diff --git a/src/rabbit.erl b/src/rabbit.erl index b8dbccc7..ffa40583 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -58,6 +58,12 @@ {requires, pre_boot}, {enables, external_infrastructure}]}). +-rabbit_boot_step({io_runner, + [{description, "IO runner"}, + {mfa, {rabbit_sup, start_child, [io_runner]}}, + {requires, pre_boot}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0b39a209..3765da85 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -526,7 +526,7 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). -read_term_file(File) -> file:consult(File). +read_term_file(File) -> file2:consult(File). write_term_file(File, Terms) -> write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || @@ -544,12 +544,12 @@ write_file(Path, Data, Modes) -> Modes1 = [binary, write | (Modes -- [binary, write])], case make_binary(Data) of Bin when is_binary(Bin) -> - case file:open(Path, Modes1) of - {ok, Hdl} -> try file:write(Hdl, Bin) of - ok -> file:sync(Hdl); + case file2:open(Path, Modes1) of + {ok, Hdl} -> try file2:write(Hdl, Bin) of + ok -> file2:sync(Hdl); {error, _} = E -> E after - file:close(Hdl) + file2:close(Hdl) end; {error, _} = E -> E end; @@ -567,7 +567,7 @@ make_binary(List) -> append_file(File, Suffix) -> - case file:read_file_info(File) of + case file2:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); {error, enoent} -> append_file(File, 0, Suffix); Error -> Error @@ -576,18 +576,18 @@ append_file(File, Suffix) -> append_file(_, _, "") -> ok; append_file(File, 0, Suffix) -> - case file:open([File, Suffix], [append]) of - {ok, Fd} -> file:close(Fd); + case file2:open([File, Suffix], [append]) of + {ok, Fd} -> file2:close(Fd); Error -> Error end; append_file(File, _, Suffix) -> - case file:read_file(File) of + case file2:read_file(File) of {ok, Data} -> write_file([File, Suffix], Data, [append]); Error -> Error end. ensure_parent_dirs_exist(Filename) -> - case filelib:ensure_dir(Filename) of + case filelib2:ensure_dir(Filename) of ok -> ok; {error, Reason} -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) @@ -749,13 +749,13 @@ recursive_delete(Files) -> end, ok, Files). recursive_delete1(Path) -> - case filelib:is_dir(Path) and not(is_symlink(Path)) of - false -> case file:delete(Path) of + case filelib2:is_dir(Path) and not(is_symlink(Path)) of + false -> case file2:delete(Path) of ok -> ok; {error, enoent} -> ok; %% Path doesn't exist anyway {error, Err} -> {error, {Path, Err}} end; - true -> case file:list_dir(Path) of + true -> case file2:list_dir(Path) of {ok, FileNames} -> case lists:foldl( fun (FileName, ok) -> @@ -765,7 +765,7 @@ recursive_delete1(Path) -> Error end, ok, FileNames) of ok -> - case file:del_dir(Path) of + case file2:del_dir(Path) of ok -> ok; {error, Err} -> {error, {Path, Err}} end; @@ -784,15 +784,15 @@ is_symlink(Name) -> end. recursive_copy(Src, Dest) -> - case filelib:is_dir(Src) of - false -> case file:copy(Src, Dest) of + case filelib2:is_dir(Src) of + false -> case file2:copy(Src, Dest) of {ok, _Bytes} -> ok; {error, enoent} -> ok; %% Path doesn't exist anyway {error, Err} -> {error, {Src, Dest, Err}} end; - true -> case file:list_dir(Src) of + true -> case file2:list_dir(Src) of {ok, FileNames} -> - case file:make_dir(Dest) of + case file2:make_dir(Dest) of ok -> lists:foldl( fun (FileName, ok) -> @@ -902,10 +902,10 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> %% TODO: When we stop supporting Erlang prior to R14, this should be %% replaced with file:open [write, exclusive] lock_file(Path) -> - case filelib:is_file(Path) of + case filelib2:is_file(Path) of true -> {error, eexist}; - false -> {ok, Lock} = file:open(Path, [write]), - ok = file:close(Lock) + false -> {ok, Lock} = file2:open(Path, [write]), + ok = file2:close(Lock) end. const_ok() -> ok. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 636913b5..2d8f6354 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -229,7 +229,7 @@ init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), - false = filelib:is_file(Dir), %% is_file == is file or dir + false = filelib2:is_file(Dir), %% is_file == is file or dir State #qistate { on_sync = OnSyncFun }. shutdown_terms(Name) -> @@ -366,9 +366,9 @@ recover(DurableQueues) -> {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. all_queue_directory_names(Dir) -> - case file:list_dir(Dir) of + case file2:list_dir(Dir) of {ok, Entries} -> [ Entry || Entry <- Entries, - filelib:is_dir( + filelib2:is_dir( filename:join(Dir, Entry)) ]; {error, enoent} -> [] end. @@ -392,7 +392,7 @@ blank_state(QueueName) -> clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). detect_clean_shutdown(Dir) -> - case file:delete(clean_file_name(Dir)) of + case file2:delete(clean_file_name(Dir)) of ok -> true; {error, enoent} -> false end. @@ -402,7 +402,7 @@ read_shutdown_terms(Dir) -> store_clean_shutdown(Terms, Dir) -> CleanFileName = clean_file_name(Dir), - ok = filelib:ensure_dir(CleanFileName), + ok = filelib2:ensure_dir(CleanFileName), rabbit_misc:write_term_file(CleanFileName, Terms). init_clean(RecoveredCounts, State) -> @@ -603,8 +603,8 @@ flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> - case filelib:is_file(Path) of - true -> ok = file:delete(Path); + case filelib2:is_file(Path) of + true -> ok = file2:delete(Path); false -> ok end, SegmentsN; @@ -630,7 +630,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - ok = filelib:ensure_dir(Path), + ok = filelib2:ensure_dir(Path), {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -735,7 +735,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end, SegName)), Set) end, sets:from_list(segment_nums(Segments)), - filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). + filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of @@ -836,7 +836,7 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> - case filelib:is_file(Path) of + case filelib2:is_file(Path) of false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), @@ -1040,12 +1040,12 @@ foreach_queue_index(Funs) -> transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), [ok = transform_file(filename:join(Dir, Seg), SegmentFun) - || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], + || Seg <- filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], ok = gatherer:finish(Gatherer). transform_file(Path, Fun) -> PathTmp = Path ++ ".upgrade", - case filelib:file_size(Path) of + case filelib2:file_size(Path) of 0 -> ok; Size -> {ok, PathTmpHdl} = file_handle_cache:open(PathTmp, ?WRITE_MODE, @@ -1059,7 +1059,7 @@ transform_file(Path, Fun) -> ok = drive_transform_fun(Fun, PathTmpHdl, Content), ok = file_handle_cache:close(PathTmpHdl), - ok = file:rename(PathTmp, Path) + ok = file2:rename(PathTmp, Path) end. drive_transform_fun(Fun, Hdl, Contents) -> |