summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 10:34:38 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 10:34:38 +0100
commit78b203937fd180fe1665f149586c78e9033d5989 (patch)
tree952307bcd71f7378da50cfe3e1da8d050706be52
parent6a5a56ea911e68f739f2759139c4dc0a93230cba (diff)
parent0902083d9e9b39ecbaf9c38a5c2861bcd54faa07 (diff)
downloadrabbitmq-server-78b203937fd180fe1665f149586c78e9033d5989.tar.gz
merge default into bug24386
-rw-r--r--src/file2.erl946
-rw-r--r--src/file_handle_cache.erl24
-rw-r--r--src/filelib2.erl470
-rw-r--r--src/io_runner.erl83
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_misc.erl42
-rw-r--r--src/rabbit_queue_index.erl26
7 files changed, 1551 insertions, 46 deletions
diff --git a/src/file2.erl b/src/file2.erl
new file mode 100644
index 00000000..d9ee1da8
--- /dev/null
+++ b/src/file2.erl
@@ -0,0 +1,946 @@
+%% This is a version of 'file' from R14B03, which runs calls to
+%% file_server2 thorugh the worker_pool and uses the file_name/1
+%% function from R12B-5. Use this module when you expect a large
+%% number of concurrent file operations.
+
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(file2).
+
+%% Interface module for the file server and the file io servers.
+
+
+
+%%% External exports
+
+-export([format_error/1]).
+%% File system and metadata.
+-export([get_cwd/0, get_cwd/1, set_cwd/1, delete/1, rename/2,
+ make_dir/1, del_dir/1, list_dir/1,
+ read_file_info/1, write_file_info/2,
+ altname/1,
+ read_link_info/1, read_link/1,
+ make_link/2, make_symlink/2,
+ read_file/1, write_file/2, write_file/3]).
+%% Specialized
+-export([ipread_s32bu_p32bu/3]).
+%% Generic file contents.
+-export([open/2, close/1, advise/4,
+ read/2, write/2,
+ pread/2, pread/3, pwrite/2, pwrite/3,
+ read_line/1,
+ position/2, truncate/1, datasync/1, sync/1,
+ copy/2, copy/3]).
+%% High level operations
+-export([consult/1, path_consult/2]).
+-export([eval/1, eval/2, path_eval/2, path_eval/3, path_open/3]).
+-export([script/1, script/2, path_script/2, path_script/3]).
+-export([change_owner/2, change_owner/3, change_group/2,
+ change_mode/2, change_time/2, change_time/3]).
+
+-export([pid2name/1]).
+
+%%% Obsolete exported functions
+
+-export([raw_read_file_info/1, raw_write_file_info/2]).
+
+%% Internal export to prim_file and ram_file until they implement
+%% an efficient copy themselves.
+-export([copy_opened/3]).
+
+-export([ipread_s32bu_p32bu_int/3]).
+
+%%% Includes and defines
+-include_lib("kernel/include/file.hrl").
+
+-define(FILE_IO_SERVER_TABLE, file_io_servers).
+
+-define(FILE_SERVER, file_server_2). % Registered name
+-define(PRIM_FILE, prim_file). % Module
+-define(RAM_FILE, ram_file). % Module
+
+%% data types
+-type deep_list() :: [char() | atom() | deep_list()].
+
+%%%-----------------------------------------------------------------
+%%% General functions
+
+format_error({_Line, ?MODULE, undefined_script}) ->
+ "no value returned from script";
+format_error({Line, ?MODULE, {Class, Reason, Stacktrace}}) ->
+ io_lib:format("~w: evaluation failed with reason ~w:~w and stacktrace ~w",
+ [Line, Class, Reason, Stacktrace]);
+format_error({Line, ?MODULE, {Reason, Stacktrace}}) ->
+ io_lib:format("~w: evaluation failed with reason ~w and stacktrace ~w",
+ [Line, Reason, Stacktrace]);
+format_error({Line, Mod, Reason}) ->
+ io_lib:format("~w: ~s", [Line, Mod:format_error(Reason)]);
+format_error(badarg) ->
+ "bad argument";
+format_error(system_limit) ->
+ "a system limit was hit, probably not enough ports";
+format_error(terminated) ->
+ "the file server process is terminated";
+format_error(ErrorId) ->
+ erl_posix_msg:message(ErrorId).
+
+pid2name(Pid) when is_pid(Pid) ->
+ case whereis(?FILE_SERVER) of
+ undefined ->
+ undefined;
+ _ ->
+ case ets:lookup(?FILE_IO_SERVER_TABLE, Pid) of
+ [{_, Name} | _] ->
+ {ok, Name};
+ _ ->
+ undefined
+ end
+ end.
+
+%%%-----------------------------------------------------------------
+%%% File server functions.
+%%% Functions that do not operate on a single open file.
+%%% Stateless.
+get_cwd() ->
+ call(get_cwd, []).
+
+get_cwd(Drive) ->
+ check_and_call(get_cwd, [file_name(Drive)]).
+
+set_cwd(Dirname) ->
+ check_and_call(set_cwd, [file_name(Dirname)]).
+
+delete(Name) ->
+ check_and_call(delete, [file_name(Name)]).
+
+rename(From, To) ->
+ check_and_call(rename, [file_name(From), file_name(To)]).
+
+make_dir(Name) ->
+ check_and_call(make_dir, [file_name(Name)]).
+
+del_dir(Name) ->
+ check_and_call(del_dir, [file_name(Name)]).
+
+read_file_info(Name) ->
+ check_and_call(read_file_info, [file_name(Name)]).
+
+altname(Name) ->
+ check_and_call(altname, [file_name(Name)]).
+
+read_link_info(Name) ->
+ check_and_call(read_link_info, [file_name(Name)]).
+
+read_link(Name) ->
+ check_and_call(read_link, [file_name(Name)]).
+
+write_file_info(Name, Info = #file_info{}) ->
+ check_and_call(write_file_info, [file_name(Name), Info]).
+
+list_dir(Name) ->
+ check_and_call(list_dir, [file_name(Name)]).
+
+read_file(Name) ->
+ check_and_call(read_file, [file_name(Name)]).
+
+make_link(Old, New) ->
+ check_and_call(make_link, [file_name(Old), file_name(New)]).
+
+make_symlink(Old, New) ->
+ check_and_call(make_symlink, [file_name(Old), file_name(New)]).
+
+write_file(Name, Bin) ->
+ check_and_call(write_file, [file_name(Name), make_binary(Bin)]).
+
+%% This whole operation should be moved to the file_server and prim_file
+%% when it is time to change file server protocol again.
+%% Meanwhile, it is implemented here, slightly less efficient.
+
+write_file(Name, Bin, ModeList) when is_list(ModeList) ->
+ case make_binary(Bin) of
+ B when is_binary(B) ->
+ case open(Name, [binary, write |
+ lists:delete(binary,
+ lists:delete(write, ModeList))]) of
+ {ok, Handle} ->
+ case write(Handle, B) of
+ ok ->
+ close(Handle);
+ E1 ->
+ close(Handle),
+ E1
+ end;
+ E2 ->
+ E2
+ end;
+ E3 ->
+ E3
+ end.
+
+%% Obsolete, undocumented, local node only, don't use!.
+%% XXX to be removed.
+raw_read_file_info(Name) ->
+ Args = [file_name(Name)],
+ case check_args(Args) of
+ ok ->
+ [FileName] = Args,
+ ?PRIM_FILE:read_file_info(FileName);
+ Error ->
+ Error
+ end.
+
+%% Obsolete, undocumented, local node only, don't use!.
+%% XXX to be removed.
+raw_write_file_info(Name, #file_info{} = Info) ->
+ Args = [file_name(Name)],
+ case check_args(Args) of
+ ok ->
+ [FileName] = Args,
+ ?PRIM_FILE:write_file_info(FileName, Info);
+ Error ->
+ Error
+ end.
+
+%%%-----------------------------------------------------------------
+%%% File io server functions.
+%%% They operate on a single open file.
+%%% Stateful.
+
+%% Contemporary mode specification - list of options
+
+open(Item, ModeList) when is_list(ModeList) ->
+ case lists:member(raw, ModeList) of
+ %% Raw file, use ?PRIM_FILE to handle this file
+ true ->
+ %% check if raw file mode is disabled
+ case catch application:get_env(kernel, raw_files) of
+ {ok,false} ->
+ open(Item, lists:delete(raw, ModeList));
+ _ -> % undefined | {ok,true}
+ Args = [file_name(Item) | ModeList],
+ case check_args(Args) of
+ ok ->
+ [FileName | _] = Args,
+ %% We rely on the returned Handle (in {ok, Handle})
+ %% being a pid() or a #file_descriptor{}
+ ?PRIM_FILE:open(FileName, ModeList);
+ Error ->
+ Error
+ end
+ end;
+ false ->
+ case lists:member(ram, ModeList) of
+ %% RAM file, use ?RAM_FILE to handle this file
+ true ->
+ case check_args(ModeList) of
+ ok ->
+ ?RAM_FILE:open(Item, ModeList);
+ Error ->
+ Error
+ end;
+ %% File server file
+ false ->
+ Args = [file_name(Item) | ModeList],
+ case check_args(Args) of
+ ok ->
+ [FileName | _] = Args,
+ call(open, [FileName, ModeList]);
+ Error ->
+ Error
+ end
+ end
+ end;
+%% Old obsolete mode specification in atom or 2-tuple format
+open(Item, Mode) ->
+ open(Item, mode_list(Mode)).
+
+%%%-----------------------------------------------------------------
+%%% The following interface functions operate on open files.
+%%% The File argument must be either a Pid or a handle
+%%% returned from ?PRIM_FILE:open.
+
+close(File) when is_pid(File) ->
+ R = file_request(File, close),
+ case wait_file_reply(File, R) of
+ {error, terminated} ->
+ ok;
+ Other ->
+ Other
+ end;
+%% unlink(File),
+%% exit(File, close),
+%% ok;
+close(#file_descriptor{module = Module} = Handle) ->
+ Module:close(Handle);
+close(_) ->
+ {error, badarg}.
+
+advise(File, Offset, Length, Advise) when is_pid(File) ->
+ R = file_request(File, {advise, Offset, Length, Advise}),
+ wait_file_reply(File, R);
+advise(#file_descriptor{module = Module} = Handle, Offset, Length, Advise) ->
+ Module:advise(Handle, Offset, Length, Advise);
+advise(_, _, _, _) ->
+ {error, badarg}.
+
+read(File, Sz) when (is_pid(File) orelse is_atom(File)), is_integer(Sz), Sz >= 0 ->
+ case io:request(File, {get_chars, '', Sz}) of
+ Data when is_list(Data); is_binary(Data) ->
+ {ok, Data};
+ Other ->
+ Other
+ end;
+read(#file_descriptor{module = Module} = Handle, Sz)
+ when is_integer(Sz), Sz >= 0 ->
+ Module:read(Handle, Sz);
+read(_, _) ->
+ {error, badarg}.
+
+read_line(File) when (is_pid(File) orelse is_atom(File)) ->
+ case io:request(File, {get_line, ''}) of
+ Data when is_list(Data); is_binary(Data) ->
+ {ok, Data};
+ Other ->
+ Other
+ end;
+read_line(#file_descriptor{module = Module} = Handle) ->
+ Module:read_line(Handle);
+read_line(_) ->
+ {error, badarg}.
+
+pread(File, L) when is_pid(File), is_list(L) ->
+ pread_int(File, L, []);
+pread(#file_descriptor{module = Module} = Handle, L) when is_list(L) ->
+ Module:pread(Handle, L);
+pread(_, _) ->
+ {error, badarg}.
+
+pread_int(_File, [], R) ->
+ {ok, lists:reverse(R)};
+pread_int(File, [{At, Sz} | T], R) when is_integer(Sz), Sz >= 0 ->
+ case pread(File, At, Sz) of
+ {ok, Data} ->
+ pread_int(File, T, [Data | R]);
+ eof ->
+ pread_int(File, T, [eof | R]);
+ {error, _} = Error ->
+ Error
+ end;
+pread_int(_, _, _) ->
+ {error, badarg}.
+
+pread(File, At, Sz) when is_pid(File), is_integer(Sz), Sz >= 0 ->
+ R = file_request(File, {pread, At, Sz}),
+ wait_file_reply(File, R);
+pread(#file_descriptor{module = Module} = Handle, Offs, Sz)
+ when is_integer(Sz), Sz >= 0 ->
+ Module:pread(Handle, Offs, Sz);
+pread(_, _, _) ->
+ {error, badarg}.
+
+write(File, Bytes) when (is_pid(File) orelse is_atom(File)) ->
+ case make_binary(Bytes) of
+ Bin when is_binary(Bin) ->
+ io:request(File, {put_chars,Bin});
+ Error ->
+ Error
+ end;
+write(#file_descriptor{module = Module} = Handle, Bytes) ->
+ Module:write(Handle, Bytes);
+write(_, _) ->
+ {error, badarg}.
+
+pwrite(File, L) when is_pid(File), is_list(L) ->
+ pwrite_int(File, L, 0);
+pwrite(#file_descriptor{module = Module} = Handle, L) when is_list(L) ->
+ Module:pwrite(Handle, L);
+pwrite(_, _) ->
+ {error, badarg}.
+
+pwrite_int(_File, [], _R) ->
+ ok;
+pwrite_int(File, [{At, Bytes} | T], R) ->
+ case pwrite(File, At, Bytes) of
+ ok ->
+ pwrite_int(File, T, R+1);
+ {error, Reason} ->
+ {error, {R, Reason}}
+ end;
+pwrite_int(_, _, _) ->
+ {error, badarg}.
+
+pwrite(File, At, Bytes) when is_pid(File) ->
+ R = file_request(File, {pwrite, At, Bytes}),
+ wait_file_reply(File, R);
+pwrite(#file_descriptor{module = Module} = Handle, Offs, Bytes) ->
+ Module:pwrite(Handle, Offs, Bytes);
+pwrite(_, _, _) ->
+ {error, badarg}.
+
+datasync(File) when is_pid(File) ->
+ R = file_request(File, datasync),
+ wait_file_reply(File, R);
+datasync(#file_descriptor{module = Module} = Handle) ->
+ Module:datasync(Handle);
+datasync(_) ->
+ {error, badarg}.
+
+sync(File) when is_pid(File) ->
+ R = file_request(File, sync),
+ wait_file_reply(File, R);
+sync(#file_descriptor{module = Module} = Handle) ->
+ Module:sync(Handle);
+sync(_) ->
+ {error, badarg}.
+
+position(File, At) when is_pid(File) ->
+ R = file_request(File, {position,At}),
+ wait_file_reply(File, R);
+position(#file_descriptor{module = Module} = Handle, At) ->
+ Module:position(Handle, At);
+position(_, _) ->
+ {error, badarg}.
+
+truncate(File) when is_pid(File) ->
+ R = file_request(File, truncate),
+ wait_file_reply(File, R);
+truncate(#file_descriptor{module = Module} = Handle) ->
+ Module:truncate(Handle);
+truncate(_) ->
+ {error, badarg}.
+
+copy(Source, Dest) ->
+ copy_int(Source, Dest, infinity).
+
+copy(Source, Dest, Length)
+ when is_integer(Length), Length >= 0;
+ is_atom(Length) ->
+ copy_int(Source, Dest, Length);
+copy(_, _, _) ->
+ {error, badarg}.
+
+%% Here we know that Length is either an atom or an integer >= 0
+%% (by the way, atoms > integers)
+%%
+%% Copy between open files.
+copy_int(Source, Dest, Length)
+ when is_pid(Source), is_pid(Dest);
+ is_pid(Source), is_record(Dest, file_descriptor);
+ is_record(Source, file_descriptor), is_pid(Dest) ->
+ copy_opened_int(Source, Dest, Length, 0);
+%% Copy between open raw files, both handled by the same module
+copy_int(#file_descriptor{module = Module} = Source,
+ #file_descriptor{module = Module} = Dest,
+ Length) ->
+ Module:copy(Source, Dest, Length);
+%% Copy between open raw files of different modules
+copy_int(#file_descriptor{} = Source,
+ #file_descriptor{} = Dest, Length) ->
+ copy_opened_int(Source, Dest, Length, 0);
+%% Copy between filenames, let the server do the copy
+copy_int({SourceName, SourceOpts}, {DestName, DestOpts}, Length)
+ when is_list(SourceOpts), is_list(DestOpts) ->
+ check_and_call(copy,
+ [file_name(SourceName), SourceOpts,
+ file_name(DestName), DestOpts,
+ Length]);
+%% Filename -> open file; must open Source and do client copy
+copy_int({SourceName, SourceOpts}, Dest, Length)
+ when is_list(SourceOpts), is_pid(Dest);
+ is_list(SourceOpts), is_record(Dest, file_descriptor) ->
+ case file_name(SourceName) of
+ {error, _} = Error ->
+ Error;
+ Source ->
+ case open(Source, [read | SourceOpts]) of
+ {ok, Handle} ->
+ Result = copy_opened_int(Handle, Dest, Length, 0),
+ close(Handle),
+ Result;
+ {error, _} = Error ->
+ Error
+ end
+ end;
+%% Open file -> filename; must open Dest and do client copy
+copy_int(Source, {DestName, DestOpts}, Length)
+ when is_pid(Source), is_list(DestOpts);
+ is_record(Source, file_descriptor), is_list(DestOpts) ->
+ case file_name(DestName) of
+ {error, _} = Error ->
+ Error;
+ Dest ->
+ case open(Dest, [write | DestOpts]) of
+ {ok, Handle} ->
+ Result = copy_opened_int(Source, Handle, Length, 0),
+ close(Handle),
+ Result;
+ {error, _} = Error ->
+ Error
+ end
+ end;
+%%
+%% That was all combinations of {Name, Opts} tuples
+%% and open files. At least one of Source and Dest has
+%% to be a bare filename.
+%%
+%% If Source is not a bare filename; Dest must be
+copy_int(Source, Dest, Length)
+ when is_pid(Source);
+ is_record(Source, file_descriptor) ->
+ copy_int(Source, {Dest, []}, Length);
+copy_int({_SourceName, SourceOpts} = Source, Dest, Length)
+ when is_list(SourceOpts) ->
+ copy_int(Source, {Dest, []}, Length);
+%% If Dest is not a bare filename; Source must be
+copy_int(Source, Dest, Length)
+ when is_pid(Dest);
+ is_record(Dest, file_descriptor) ->
+ copy_int({Source, []}, Dest, Length);
+copy_int(Source, {_DestName, DestOpts} = Dest, Length)
+ when is_list(DestOpts) ->
+ copy_int({Source, []}, Dest, Length);
+%% Both must be bare filenames. If they are not,
+%% the filename check in the copy operation will yell.
+copy_int(Source, Dest, Length) ->
+ copy_int({Source, []}, {Dest, []}, Length).
+
+
+
+copy_opened(Source, Dest, Length)
+ when is_integer(Length), Length >= 0;
+ is_atom(Length) ->
+ copy_opened_int(Source, Dest, Length);
+copy_opened(_, _, _) ->
+ {error, badarg}.
+
+%% Here we know that Length is either an atom or an integer >= 0
+%% (by the way, atoms > integers)
+
+copy_opened_int(Source, Dest, Length)
+ when is_pid(Source), is_pid(Dest) ->
+ copy_opened_int(Source, Dest, Length, 0);
+copy_opened_int(Source, Dest, Length)
+ when is_pid(Source), is_record(Dest, file_descriptor) ->
+ copy_opened_int(Source, Dest, Length, 0);
+copy_opened_int(Source, Dest, Length)
+ when is_record(Source, file_descriptor), is_pid(Dest) ->
+ copy_opened_int(Source, Dest, Length, 0);
+copy_opened_int(Source, Dest, Length)
+ when is_record(Source, file_descriptor), is_record(Dest, file_descriptor) ->
+ copy_opened_int(Source, Dest, Length, 0);
+copy_opened_int(_, _, _) ->
+ {error, badarg}.
+
+%% Here we know that Source and Dest are handles to open files, Length is
+%% as above, and Copied is an integer >= 0
+
+%% Copy loop in client process
+copy_opened_int(_, _, Length, Copied) when Length =< 0 -> % atom() > integer()
+ {ok, Copied};
+copy_opened_int(Source, Dest, Length, Copied) ->
+ N = if Length > 65536 -> 65536; true -> Length end, % atom() > integer() !
+ case read(Source, N) of
+ {ok, Data} ->
+ M = if is_binary(Data) -> byte_size(Data);
+ is_list(Data) -> length(Data)
+ end,
+ case write(Dest, Data) of
+ ok ->
+ if M < N ->
+ %% Got less than asked for - must be end of file
+ {ok, Copied+M};
+ true ->
+ %% Decrement Length (might be an atom (infinity))
+ NewLength = if is_atom(Length) -> Length;
+ true -> Length-M
+ end,
+ copy_opened_int(Source, Dest, NewLength, Copied+M)
+ end;
+ {error, _} = Error ->
+ Error
+ end;
+ eof ->
+ {ok, Copied};
+ {error, _} = Error ->
+ Error
+ end.
+
+
+%% Special indirect pread function. Introduced for Dets.
+%% Reads a header from pos 'Pos', the header is first a size encoded as
+%% 32 bit big endian unsigned and then a position also encoded as
+%% 32 bit big endian. Finally it preads the data from that pos and size
+%% in the file.
+
+ipread_s32bu_p32bu(File, Pos, MaxSize) when is_pid(File) ->
+ ipread_s32bu_p32bu_int(File, Pos, MaxSize);
+ipread_s32bu_p32bu(#file_descriptor{module = Module} = Handle, Pos, MaxSize) ->
+ Module:ipread_s32bu_p32bu(Handle, Pos, MaxSize);
+ipread_s32bu_p32bu(_, _, _) ->
+ {error, badarg}.
+
+ipread_s32bu_p32bu_int(File, Pos, Infinity) when is_atom(Infinity) ->
+ ipread_s32bu_p32bu_int(File, Pos, (1 bsl 31)-1);
+ipread_s32bu_p32bu_int(File, Pos, MaxSize)
+ when is_integer(MaxSize), MaxSize >= 0 ->
+ if
+ MaxSize < (1 bsl 31) ->
+ case pread(File, Pos, 8) of
+ {ok, Header} ->
+ ipread_s32bu_p32bu_2(File, Header, MaxSize);
+ Error ->
+ Error
+ end;
+ true ->
+ {error, einval}
+ end;
+ipread_s32bu_p32bu_int(_File, _Pos, _MaxSize) ->
+ {error, badarg}.
+
+ipread_s32bu_p32bu_2(_File,
+ <<0:32/big-unsigned, Pos:32/big-unsigned>>,
+ _MaxSize) ->
+ {ok, {0, Pos, eof}};
+ipread_s32bu_p32bu_2(File,
+ <<Size:32/big-unsigned, Pos:32/big-unsigned>>,
+ MaxSize)
+ when Size =< MaxSize ->
+ case pread(File, Pos, Size) of
+ {ok, Data} ->
+ {ok, {Size, Pos, Data}};
+ eof ->
+ {ok, {Size, Pos, eof}};
+ Error ->
+ Error
+ end;
+ipread_s32bu_p32bu_2(_File,
+ <<_:8/binary>>,
+ _MaxSize) ->
+ eof;
+ipread_s32bu_p32bu_2(_File,
+ <<_/binary>>,
+ _MaxSize) ->
+ eof;
+ipread_s32bu_p32bu_2(File,
+ Header,
+ MaxSize) when is_list(Header) ->
+ ipread_s32bu_p32bu_2(File, list_to_binary(Header), MaxSize).
+
+
+
+%%%-----------------------------------------------------------------
+%%% The following functions, built upon the other interface functions,
+%%% provide a higher-lever interface to files.
+
+consult(File) ->
+ case open(File, [read]) of
+ {ok, Fd} ->
+ R = consult_stream(Fd),
+ close(Fd),
+ R;
+ Error ->
+ Error
+ end.
+
+path_consult(Path, File) ->
+ case path_open(Path, File, [read]) of
+ {ok, Fd, Full} ->
+ case consult_stream(Fd) of
+ {ok, List} ->
+ close(Fd),
+ {ok, List, Full};
+ E1 ->
+ close(Fd),
+ E1
+ end;
+ E2 ->
+ E2
+ end.
+
+eval(File) ->
+ eval(File, erl_eval:new_bindings()).
+
+eval(File, Bs) ->
+ case open(File, [read]) of
+ {ok, Fd} ->
+ R = eval_stream(Fd, ignore, Bs),
+ close(Fd),
+ R;
+ Error ->
+ Error
+ end.
+
+path_eval(Path, File) ->
+ path_eval(Path, File, erl_eval:new_bindings()).
+
+path_eval(Path, File, Bs) ->
+ case path_open(Path, File, [read]) of
+ {ok, Fd, Full} ->
+ case eval_stream(Fd, ignore, Bs) of
+ ok ->
+ close(Fd),
+ {ok, Full};
+ E1 ->
+ close(Fd),
+ E1
+ end;
+ E2 ->
+ E2
+ end.
+
+script(File) ->
+ script(File, erl_eval:new_bindings()).
+
+script(File, Bs) ->
+ case open(File, [read]) of
+ {ok, Fd} ->
+ R = eval_stream(Fd, return, Bs),
+ close(Fd),
+ R;
+ Error ->
+ Error
+ end.
+
+path_script(Path, File) ->
+ path_script(Path, File, erl_eval:new_bindings()).
+
+path_script(Path, File, Bs) ->
+ case path_open(Path, File, [read]) of
+ {ok,Fd,Full} ->
+ case eval_stream(Fd, return, Bs) of
+ {ok,R} ->
+ close(Fd),
+ {ok, R, Full};
+ E1 ->
+ close(Fd),
+ E1
+ end;
+ E2 ->
+ E2
+ end.
+
+
+%% path_open(Paths, Filename, Mode) ->
+%% {ok,FileDescriptor,FullName}
+%% {error,Reason}
+%%
+%% Searches the Paths for file Filename which can be opened with Mode.
+%% The path list is ignored if Filename contains an absolute path.
+
+path_open(PathList, Name, Mode) ->
+ case file_name(Name) of
+ {error, _} = Error ->
+ Error;
+ FileName ->
+ case filename:pathtype(FileName) of
+ relative ->
+ path_open_first(PathList, FileName, Mode, enoent);
+ _ ->
+ case open(Name, Mode) of
+ {ok, Fd} ->
+ {ok, Fd, Name};
+ Error ->
+ Error
+ end
+ end
+ end.
+
+change_mode(Name, Mode)
+ when is_integer(Mode) ->
+ write_file_info(Name, #file_info{mode=Mode}).
+
+change_owner(Name, OwnerId)
+ when is_integer(OwnerId) ->
+ write_file_info(Name, #file_info{uid=OwnerId}).
+
+change_owner(Name, OwnerId, GroupId)
+ when is_integer(OwnerId), is_integer(GroupId) ->
+ write_file_info(Name, #file_info{uid=OwnerId, gid=GroupId}).
+
+change_group(Name, GroupId)
+ when is_integer(GroupId) ->
+ write_file_info(Name, #file_info{gid=GroupId}).
+
+change_time(Name, Time)
+ when is_tuple(Time) ->
+ write_file_info(Name, #file_info{mtime=Time}).
+
+change_time(Name, Atime, Mtime)
+ when is_tuple(Atime), is_tuple(Mtime) ->
+ write_file_info(Name, #file_info{atime=Atime, mtime=Mtime}).
+
+%%%-----------------------------------------------------------------
+%%% Helpers
+
+consult_stream(Fd) ->
+ consult_stream(Fd, 1, []).
+
+consult_stream(Fd, Line, Acc) ->
+ case io:read(Fd, '', Line) of
+ {ok,Term,EndLine} ->
+ consult_stream(Fd, EndLine, [Term|Acc]);
+ {error,Error,_Line} ->
+ {error,Error};
+ {eof,_Line} ->
+ {ok,lists:reverse(Acc)}
+ end.
+
+eval_stream(Fd, Handling, Bs) ->
+ eval_stream(Fd, Handling, 1, undefined, [], Bs).
+
+eval_stream(Fd, H, Line, Last, E, Bs) ->
+ eval_stream2(io:parse_erl_exprs(Fd, '', Line), Fd, H, Last, E, Bs).
+
+eval_stream2({ok,Form,EndLine}, Fd, H, Last, E, Bs0) ->
+ try erl_eval:exprs(Form, Bs0) of
+ {value,V,Bs} ->
+ eval_stream(Fd, H, EndLine, {V}, E, Bs)
+ catch Class:Reason ->
+ Error = {EndLine,?MODULE,{Class,Reason,erlang:get_stacktrace()}},
+ eval_stream(Fd, H, EndLine, Last, [Error|E], Bs0)
+ end;
+eval_stream2({error,What,EndLine}, Fd, H, Last, E, Bs) ->
+ eval_stream(Fd, H, EndLine, Last, [What | E], Bs);
+eval_stream2({eof,EndLine}, _Fd, H, Last, E, _Bs) ->
+ case {H, Last, E} of
+ {return, {Val}, []} ->
+ {ok, Val};
+ {return, undefined, E} ->
+ {error, hd(lists:reverse(E, [{EndLine,?MODULE,undefined_script}]))};
+ {ignore, _, []} ->
+ ok;
+ {_, _, [_|_] = E} ->
+ {error, hd(lists:reverse(E))}
+ end.
+
+path_open_first([Path|Rest], Name, Mode, LastError) ->
+ case file_name(Path) of
+ {error, _} = Error ->
+ Error;
+ FilePath ->
+ FileName = filename:join(FilePath, Name),
+ case open(FileName, Mode) of
+ {ok, Fd} ->
+ {ok, Fd, FileName};
+ {error, enoent} ->
+ path_open_first(Rest, Name, Mode, LastError);
+ Error ->
+ Error
+ end
+ end;
+path_open_first([], _Name, _Mode, LastError) ->
+ {error, LastError}.
+
+%%%-----------------------------------------------------------------
+%%% Utility functions.
+
+%% file_name(FileName)
+%% Generates a flat file name from a deep list of atoms and
+%% characters (integers).
+
+file_name(N) ->
+ try
+ file_name_1(N)
+ catch Reason ->
+ {error, Reason}
+ end.
+
+file_name_1([C|T]) when is_integer(C), C > 0, C =< 255 ->
+ [C|file_name_1(T)];
+file_name_1([H|T]) ->
+ file_name_1(H) ++ file_name_1(T);
+file_name_1([]) ->
+ [];
+file_name_1(N) when is_atom(N) ->
+ atom_to_list(N);
+file_name_1(_) ->
+ throw(badarg).
+
+make_binary(Bin) when is_binary(Bin) ->
+ Bin;
+make_binary(List) ->
+ %% Convert the list to a binary in order to avoid copying a list
+ %% to the file server.
+ try
+ erlang:iolist_to_binary(List)
+ catch error:Reason ->
+ {error, Reason}
+ end.
+
+mode_list(read) ->
+ [read];
+mode_list(write) ->
+ [write];
+mode_list(read_write) ->
+ [read, write];
+mode_list({binary, Mode}) when is_atom(Mode) ->
+ [binary | mode_list(Mode)];
+mode_list({character, Mode}) when is_atom(Mode) ->
+ mode_list(Mode);
+mode_list(_) ->
+ [{error, badarg}].
+
+%%-----------------------------------------------------------------
+%% Functions for communicating with the file server
+
+call(Command, Args) when is_list(Args) ->
+ io_runner:submit(
+ fun () ->
+ gen_server:call(?FILE_SERVER, list_to_tuple([Command | Args]),
+ infinity)
+ end).
+
+check_and_call(Command, Args) when is_list(Args) ->
+ case check_args(Args) of
+ ok ->
+ call(Command, Args);
+ Error ->
+ Error
+ end.
+
+check_args([{error, _}=Error|_Rest]) ->
+ Error;
+check_args([_Name|Rest]) ->
+ check_args(Rest);
+check_args([]) ->
+ ok.
+
+%%-----------------------------------------------------------------
+%% Functions for communicating with a file io server.
+%% The messages sent have the following formats:
+%%
+%% {file_request,From,ReplyAs,Request}
+%% {file_reply,ReplyAs,Reply}
+
+file_request(Io, Request) ->
+ R = erlang:monitor(process, Io),
+ Io ! {file_request,self(),Io,Request},
+ R.
+
+wait_file_reply(From, Ref) ->
+ receive
+ {file_reply,From,Reply} ->
+ erlang:demonitor(Ref),
+ receive {'DOWN', Ref, _, _, _} -> ok after 0 -> ok end,
+ %% receive {'EXIT', From, _} -> ok after 0 -> ok end,
+ Reply;
+ {'DOWN', Ref, _, _, _} ->
+ %% receive {'EXIT', From, _} -> ok after 0 -> ok end,
+ {error, terminated}
+ end.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3c2111dc..39be0e9b 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -317,7 +317,7 @@ read(Ref, Count) ->
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case file:read(Hdl, Count) of
+ case file2:read(Hdl, Count) of
{ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
{Obj,
[Handle #handle { offset = Offset1 }]};
@@ -337,7 +337,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {file:write(Hdl, Data),
+ {file2:write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -364,7 +364,7 @@ sync(Ref) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case file:sync(Hdl) of
+ case file2:sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -381,7 +381,7 @@ truncate(Ref) ->
with_flushed_handles(
[Ref],
fun ([Handle1 = #handle { hdl = Hdl }]) ->
- case file:truncate(Hdl) of
+ case file2:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end
@@ -408,7 +408,7 @@ copy(Src, Dest, Count) ->
fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
) ->
- case file:copy(SHdl, DHdl, Count) of
+ case file2:copy(SHdl, DHdl, Count) of
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
@@ -428,7 +428,7 @@ delete(Ref) ->
Handle = #handle { path = Path } ->
case hard_close(Handle #handle { is_dirty = false,
write_buffer = [] }) of
- ok -> file:delete(Path);
+ ok -> file2:delete(Path);
{Error, Handle1} -> put_handle(Ref, Handle1),
Error
end
@@ -443,7 +443,7 @@ clear(Ref) ->
case maybe_seek(bof, Handle #handle { write_buffer = [],
write_buffer_size = 0 }) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
- case file:truncate(Hdl) of
+ case file2:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end;
@@ -566,7 +566,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case file:open(Path, case NewOrReopen of
+ case file2:open(Path, case NewOrReopen of
new -> Mode;
reopen -> [read | Mode]
end) of
@@ -693,10 +693,10 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> file:sync(Hdl);
+ true -> file2:sync(Hdl);
false -> ok
end,
- ok = file:close(Hdl),
+ ok = file2:close(Hdl),
age_tree_delete(Then),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
@@ -731,7 +731,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
at_eof = AtEoF }) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
case (case NeedsSeek of
- true -> file:position(Hdl, NewOffset);
+ true -> file2:position(Hdl, NewOffset);
false -> {ok, Offset}
end) of
{ok, Offset1} = Result ->
@@ -768,7 +768,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case file2:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
diff --git a/src/filelib2.erl b/src/filelib2.erl
new file mode 100644
index 00000000..8fff0402
--- /dev/null
+++ b/src/filelib2.erl
@@ -0,0 +1,470 @@
+%% This is a version of 'filelib' from R14B03, which uses 'file2'
+%% instead of 'file'. Use this module when you expect a large number
+%% of concurrent file operations.
+
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+
+-module(filelib2).
+
+%% File utilities.
+
+%% Avoid warning for local function error/1 clashing with autoimported BIF.
+-compile({no_auto_import,[error/1]}).
+-export([wildcard/1, wildcard/2, is_dir/1, is_file/1, is_regular/1,
+ compile_wildcard/1]).
+-export([fold_files/5, last_modified/1, file_size/1, ensure_dir/1]).
+
+-export([wildcard/3, is_dir/2, is_file/2, is_regular/2]).
+-export([fold_files/6, last_modified/2, file_size/2]).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(HANDLE_ERROR(Expr),
+ try
+ Expr
+ catch
+ error:{badpattern,_}=UnUsUalVaRiAbLeNaMe ->
+ %% Get the stack backtrace correct.
+ erlang:error(UnUsUalVaRiAbLeNaMe)
+ end).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+wildcard(Pattern) when is_list(Pattern) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, file)).
+
+wildcard(Pattern, Cwd) when is_list(Pattern), (is_list(Cwd) or is_binary(Cwd)) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, file));
+wildcard(Pattern, Mod) when is_list(Pattern), is_atom(Mod) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, Mod)).
+
+-spec wildcard(file:name(), file:name(), atom()) -> [file:filename()].
+wildcard(Pattern, Cwd, Mod)
+ when is_list(Pattern), (is_list(Cwd) or is_binary(Cwd)), is_atom(Mod) ->
+ ?HANDLE_ERROR(do_wildcard(Pattern, Cwd, Mod)).
+
+is_dir(Dir) ->
+ do_is_dir(Dir, file).
+
+is_dir(Dir, Mod) when is_atom(Mod) ->
+ do_is_dir(Dir, Mod).
+
+is_file(File) ->
+ do_is_file(File, file).
+
+is_file(File, Mod) when is_atom(Mod) ->
+ do_is_file(File, Mod).
+
+is_regular(File) ->
+ do_is_regular(File, file).
+
+is_regular(File, Mod) when is_atom(Mod) ->
+ do_is_regular(File, Mod).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc) ->
+ do_fold_files(Dir, RegExp, Recursive, Fun, Acc, file).
+
+fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) when is_atom(Mod) ->
+ do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod).
+
+last_modified(File) ->
+ do_last_modified(File, file).
+
+-spec last_modified(file:name(), atom()) -> file:date_time() | 0.
+last_modified(File, Mod) when is_atom(Mod) ->
+ do_last_modified(File, Mod).
+
+file_size(File) ->
+ do_file_size(File, file).
+
+-spec file_size(file:name(), atom()) -> non_neg_integer().
+file_size(File, Mod) when is_atom(Mod) ->
+ do_file_size(File, Mod).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+do_wildcard(Pattern, Mod) when is_list(Pattern) ->
+ do_wildcard_comp(do_compile_wildcard(Pattern), Mod).
+
+do_wildcard_comp({compiled_wildcard,{exists,File}}, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok,_} -> [File];
+ _ -> []
+ end;
+do_wildcard_comp({compiled_wildcard,[Base|Rest]}, Mod) ->
+ do_wildcard_1([Base], Rest, Mod).
+
+do_wildcard(Pattern, Cwd, Mod) when is_list(Pattern), (is_list(Cwd) or is_binary(Cwd)) ->
+ do_wildcard_comp(do_compile_wildcard(Pattern), Cwd, Mod).
+
+do_wildcard_comp({compiled_wildcard,{exists,File}}, Cwd, Mod) ->
+ case eval_read_file_info(filename:absname(File, Cwd), Mod) of
+ {ok,_} -> [File];
+ _ -> []
+ end;
+do_wildcard_comp({compiled_wildcard,[current|Rest]}, Cwd0, Mod) ->
+ {Cwd,PrefixLen} = case filename:join([Cwd0]) of
+ Bin when is_binary(Bin) -> {Bin,byte_size(Bin)+1};
+ Other -> {Other,length(Other)+1}
+ end, %Slash away redundant slashes.
+ [
+ if
+ is_binary(N) ->
+ <<_:PrefixLen/binary,Res/binary>> = N,
+ Res;
+ true ->
+ lists:nthtail(PrefixLen, N)
+ end || N <- do_wildcard_1([Cwd], Rest, Mod)];
+do_wildcard_comp({compiled_wildcard,[Base|Rest]}, _Cwd, Mod) ->
+ do_wildcard_1([Base], Rest, Mod).
+
+do_is_dir(Dir, Mod) ->
+ case eval_read_file_info(Dir, Mod) of
+ {ok, #file_info{type=directory}} ->
+ true;
+ _ ->
+ false
+ end.
+
+do_is_file(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, #file_info{type=regular}} ->
+ true;
+ {ok, #file_info{type=directory}} ->
+ true;
+ _ ->
+ false
+ end.
+
+do_is_regular(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, #file_info{type=regular}} ->
+ true;
+ _ ->
+ false
+ end.
+
+%% fold_files(Dir, RegExp, Recursive, Fun, AccIn).
+
+%% folds the function Fun(F, Acc) -> Acc1 over
+%% all files <F> in <Dir> that match the regular expression <RegExp>
+%% If <Recursive> is true all sub-directories to <Dir> are processed
+
+do_fold_files(Dir, RegExp, Recursive, Fun, Acc, Mod) ->
+ {ok, Re1} = re:compile(RegExp,[unicode]),
+ do_fold_files1(Dir, Re1, RegExp, Recursive, Fun, Acc, Mod).
+
+do_fold_files1(Dir, RegExp, OrigRE, Recursive, Fun, Acc, Mod) ->
+ case eval_list_dir(Dir, Mod) of
+ {ok, Files} -> do_fold_files2(Files, Dir, RegExp, OrigRE,
+ Recursive, Fun, Acc, Mod);
+ {error, _} -> Acc
+ end.
+
+%% OrigRE is not to be compiled as it's for non conforming filenames,
+%% i.e. for filenames that does not comply to the current encoding, which should
+%% be very rare. We use it only in those cases and do not want to precompile.
+do_fold_files2([], _Dir, _RegExp, _OrigRE, _Recursive, _Fun, Acc, _Mod) ->
+ Acc;
+do_fold_files2([File|T], Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod) ->
+ FullName = filename:join(Dir, File),
+ case do_is_regular(FullName, Mod) of
+ true ->
+ case (catch re:run(File, if is_binary(File) -> OrigRE;
+ true -> RegExp end,
+ [{capture,none}])) of
+ match ->
+ Acc = Fun(FullName, Acc0),
+ do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc, Mod);
+ {'EXIT',_} ->
+ do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod);
+ nomatch ->
+ do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod)
+ end;
+ false ->
+ case Recursive andalso do_is_dir(FullName, Mod) of
+ true ->
+ Acc1 = do_fold_files1(FullName, RegExp, OrigRE, Recursive,
+ Fun, Acc0, Mod),
+ do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc1, Mod);
+ false ->
+ do_fold_files2(T, Dir, RegExp, OrigRE, Recursive, Fun, Acc0, Mod)
+ end
+ end.
+
+do_last_modified(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, Info} ->
+ Info#file_info.mtime;
+ _ ->
+ 0
+ end.
+
+do_file_size(File, Mod) ->
+ case eval_read_file_info(File, Mod) of
+ {ok, Info} ->
+ Info#file_info.size;
+ _ ->
+ 0
+ end.
+
+%%----------------------------------------------------------------------
+%% +type ensure_dir(X) -> ok | {error, Reason}.
+%% +type X = filename() | dirname()
+%% ensures that the directory name required to create D exists
+
+ensure_dir("/") ->
+ ok;
+ensure_dir(F) ->
+ Dir = filename:dirname(F),
+ case do_is_dir(Dir, file) of
+ true ->
+ ok;
+ false ->
+ ensure_dir(Dir),
+ case file2:make_dir(Dir) of
+ {error,eexist}=EExist ->
+ case do_is_dir(Dir, file) of
+ true ->
+ ok;
+ false ->
+ EExist
+ end;
+ Err ->
+ Err
+ end
+ end.
+
+
+%%%
+%%% Pattern matching using a compiled wildcard.
+%%%
+
+do_wildcard_1(Files, Pattern, Mod) ->
+ do_wildcard_2(Files, Pattern, [], Mod).
+
+do_wildcard_2([File|Rest], Pattern, Result, Mod) ->
+ do_wildcard_2(Rest, Pattern, do_wildcard_3(File, Pattern, Result, Mod), Mod);
+do_wildcard_2([], _, Result, _Mod) ->
+ Result.
+
+do_wildcard_3(Base, [Pattern|Rest], Result, Mod) ->
+ case do_list_dir(Base, Mod) of
+ {ok, Files0} ->
+ Files = lists:sort(Files0),
+ Matches = wildcard_4(Pattern, Files, Base, []),
+ do_wildcard_2(Matches, Rest, Result, Mod);
+ _ ->
+ Result
+ end;
+do_wildcard_3(Base, [], Result, _Mod) ->
+ [Base|Result].
+
+wildcard_4(Pattern, [File|Rest], Base, Result) when is_binary(File) ->
+ case wildcard_5(Pattern, binary_to_list(File)) of
+ true ->
+ wildcard_4(Pattern, Rest, Base, [join(Base, File)|Result]);
+ false ->
+ wildcard_4(Pattern, Rest, Base, Result)
+ end;
+wildcard_4(Pattern, [File|Rest], Base, Result) ->
+ case wildcard_5(Pattern, File) of
+ true ->
+ wildcard_4(Pattern, Rest, Base, [join(Base, File)|Result]);
+ false ->
+ wildcard_4(Pattern, Rest, Base, Result)
+ end;
+wildcard_4(_Patt, [], _Base, Result) ->
+ Result.
+
+wildcard_5([question|Rest1], [_|Rest2]) ->
+ wildcard_5(Rest1, Rest2);
+wildcard_5([accept], _) ->
+ true;
+wildcard_5([star|Rest], File) ->
+ do_star(Rest, File);
+wildcard_5([{one_of, Ordset}|Rest], [C|File]) ->
+ case ordsets:is_element(C, Ordset) of
+ true -> wildcard_5(Rest, File);
+ false -> false
+ end;
+wildcard_5([{alt, Alts}], File) ->
+ do_alt(Alts, File);
+wildcard_5([C|Rest1], [C|Rest2]) when is_integer(C) ->
+ wildcard_5(Rest1, Rest2);
+wildcard_5([X|_], [Y|_]) when is_integer(X), is_integer(Y) ->
+ false;
+wildcard_5([], []) ->
+ true;
+wildcard_5([], [_|_]) ->
+ false;
+wildcard_5([_|_], []) ->
+ false.
+
+do_star(Pattern, [X|Rest]) ->
+ case wildcard_5(Pattern, [X|Rest]) of
+ true -> true;
+ false -> do_star(Pattern, Rest)
+ end;
+do_star(Pattern, []) ->
+ wildcard_5(Pattern, []).
+
+do_alt([Alt|Rest], File) ->
+ case wildcard_5(Alt, File) of
+ true -> true;
+ false -> do_alt(Rest, File)
+ end;
+do_alt([], _File) ->
+ false.
+
+do_list_dir(current, Mod) -> eval_list_dir(".", Mod);
+do_list_dir(Dir, Mod) -> eval_list_dir(Dir, Mod).
+
+join(current, File) -> File;
+join(Base, File) -> filename:join(Base, File).
+
+
+%%% Compiling a wildcard.
+
+compile_wildcard(Pattern) ->
+ ?HANDLE_ERROR(do_compile_wildcard(Pattern)).
+
+do_compile_wildcard(Pattern) ->
+ {compiled_wildcard,compile_wildcard_1(Pattern)}.
+
+compile_wildcard_1(Pattern) ->
+ [Root|Rest] = filename:split(Pattern),
+ case filename:pathtype(Root) of
+ relative ->
+ compile_wildcard_2([Root|Rest], current);
+ _ ->
+ compile_wildcard_2(Rest, [Root])
+ end.
+
+compile_wildcard_2([Part|Rest], Root) ->
+ case compile_part(Part) of
+ Part ->
+ compile_wildcard_2(Rest, join(Root, Part));
+ Pattern ->
+ compile_wildcard_3(Rest, [Pattern,Root])
+ end;
+compile_wildcard_2([], Root) -> {exists,Root}.
+
+compile_wildcard_3([Part|Rest], Result) ->
+ compile_wildcard_3(Rest, [compile_part(Part)|Result]);
+compile_wildcard_3([], Result) ->
+ lists:reverse(Result).
+
+compile_part(Part) ->
+ compile_part(Part, false, []).
+
+compile_part_to_sep(Part) ->
+ compile_part(Part, true, []).
+
+compile_part([], true, _) ->
+ error(missing_delimiter);
+compile_part([$,|Rest], true, Result) ->
+ {ok, $,, lists:reverse(Result), Rest};
+compile_part([$}|Rest], true, Result) ->
+ {ok, $}, lists:reverse(Result), Rest};
+compile_part([$?|Rest], Upto, Result) ->
+ compile_part(Rest, Upto, [question|Result]);
+compile_part([$*], Upto, Result) ->
+ compile_part([], Upto, [accept|Result]);
+compile_part([$*|Rest], Upto, Result) ->
+ compile_part(Rest, Upto, [star|Result]);
+compile_part([$[|Rest], Upto, Result) ->
+ case compile_charset(Rest, ordsets:new()) of
+ {ok, Charset, Rest1} ->
+ compile_part(Rest1, Upto, [Charset|Result]);
+ error ->
+ compile_part(Rest, Upto, [$[|Result])
+ end;
+compile_part([${|Rest], Upto, Result) ->
+ case compile_alt(Rest) of
+ {ok, Alt} ->
+ lists:reverse(Result, [Alt]);
+ error ->
+ compile_part(Rest, Upto, [${|Result])
+ end;
+compile_part([X|Rest], Upto, Result) ->
+ compile_part(Rest, Upto, [X|Result]);
+compile_part([], _Upto, Result) ->
+ lists:reverse(Result).
+
+compile_charset([$]|Rest], Ordset) ->
+ compile_charset1(Rest, ordsets:add_element($], Ordset));
+compile_charset([$-|Rest], Ordset) ->
+ compile_charset1(Rest, ordsets:add_element($-, Ordset));
+compile_charset([], _Ordset) ->
+ error;
+compile_charset(List, Ordset) ->
+ compile_charset1(List, Ordset).
+
+compile_charset1([Lower, $-, Upper|Rest], Ordset) when Lower =< Upper ->
+ compile_charset1(Rest, compile_range(Lower, Upper, Ordset));
+compile_charset1([$]|Rest], Ordset) ->
+ {ok, {one_of, Ordset}, Rest};
+compile_charset1([X|Rest], Ordset) ->
+ compile_charset1(Rest, ordsets:add_element(X, Ordset));
+compile_charset1([], _Ordset) ->
+ error.
+
+compile_range(Lower, Current, Ordset) when Lower =< Current ->
+ compile_range(Lower, Current-1, ordsets:add_element(Current, Ordset));
+compile_range(_, _, Ordset) ->
+ Ordset.
+
+compile_alt(Pattern) ->
+ compile_alt(Pattern, []).
+
+compile_alt(Pattern, Result) ->
+ case compile_part_to_sep(Pattern) of
+ {ok, $,, AltPattern, Rest} ->
+ compile_alt(Rest, [AltPattern|Result]);
+ {ok, $}, AltPattern, Rest} ->
+ NewResult = [AltPattern|Result],
+ RestPattern = compile_part(Rest),
+ {ok, {alt, [Alt++RestPattern || Alt <- NewResult]}};
+ Pattern ->
+ error
+ end.
+
+error(Reason) ->
+ erlang:error({badpattern,Reason}).
+
+eval_read_file_info(File, file) ->
+ file2:read_file_info(File);
+eval_read_file_info(File, erl_prim_loader) ->
+ case erl_prim_loader:read_file_info(File) of
+ error -> {error, erl_prim_loader};
+ Res-> Res
+ end;
+eval_read_file_info(File, Mod) ->
+ Mod:read_file_info(File).
+
+eval_list_dir(Dir, file) ->
+ file2:list_dir(Dir);
+eval_list_dir(Dir, erl_prim_loader) ->
+ case erl_prim_loader:list_dir(Dir) of
+ error -> {error, erl_prim_loader};
+ Res-> Res
+ end;
+eval_list_dir(Dir, Mod) ->
+ Mod:list_dir(Dir).
diff --git a/src/io_runner.erl b/src/io_runner.erl
new file mode 100644
index 00000000..8a252158
--- /dev/null
+++ b/src/io_runner.erl
@@ -0,0 +1,83 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%%
+
+-module(io_runner).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, submit/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
+
+submit(Fun) ->
+ %% If the io_runner is not running, just run the Fun in the
+ %% current process.
+ case whereis(?SERVER) of
+ undefined -> run(Fun);
+ _ -> gen_server2:call(?SERVER, {run, Fun}, infinity)
+ end.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, nostate, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({run, Fun}, _From, State) ->
+ {reply, run(Fun), State, hibernate};
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+run({M, F, A}) ->
+ apply(M, F, A);
+run(Fun) ->
+ Fun().
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b8dbccc7..ffa40583 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -58,6 +58,12 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({io_runner,
+ [{description, "IO runner"},
+ {mfa, {rabbit_sup, start_child, [io_runner]}},
+ {requires, pre_boot},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0b39a209..3765da85 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -526,7 +526,7 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
-read_term_file(File) -> file:consult(File).
+read_term_file(File) -> file2:consult(File).
write_term_file(File, Terms) ->
write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
@@ -544,12 +544,12 @@ write_file(Path, Data, Modes) ->
Modes1 = [binary, write | (Modes -- [binary, write])],
case make_binary(Data) of
Bin when is_binary(Bin) ->
- case file:open(Path, Modes1) of
- {ok, Hdl} -> try file:write(Hdl, Bin) of
- ok -> file:sync(Hdl);
+ case file2:open(Path, Modes1) of
+ {ok, Hdl} -> try file2:write(Hdl, Bin) of
+ ok -> file2:sync(Hdl);
{error, _} = E -> E
after
- file:close(Hdl)
+ file2:close(Hdl)
end;
{error, _} = E -> E
end;
@@ -567,7 +567,7 @@ make_binary(List) ->
append_file(File, Suffix) ->
- case file:read_file_info(File) of
+ case file2:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
{error, enoent} -> append_file(File, 0, Suffix);
Error -> Error
@@ -576,18 +576,18 @@ append_file(File, Suffix) ->
append_file(_, _, "") ->
ok;
append_file(File, 0, Suffix) ->
- case file:open([File, Suffix], [append]) of
- {ok, Fd} -> file:close(Fd);
+ case file2:open([File, Suffix], [append]) of
+ {ok, Fd} -> file2:close(Fd);
Error -> Error
end;
append_file(File, _, Suffix) ->
- case file:read_file(File) of
+ case file2:read_file(File) of
{ok, Data} -> write_file([File, Suffix], Data, [append]);
Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
- case filelib:ensure_dir(Filename) of
+ case filelib2:ensure_dir(Filename) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
@@ -749,13 +749,13 @@ recursive_delete(Files) ->
end, ok, Files).
recursive_delete1(Path) ->
- case filelib:is_dir(Path) and not(is_symlink(Path)) of
- false -> case file:delete(Path) of
+ case filelib2:is_dir(Path) and not(is_symlink(Path)) of
+ false -> case file2:delete(Path) of
ok -> ok;
{error, enoent} -> ok; %% Path doesn't exist anyway
{error, Err} -> {error, {Path, Err}}
end;
- true -> case file:list_dir(Path) of
+ true -> case file2:list_dir(Path) of
{ok, FileNames} ->
case lists:foldl(
fun (FileName, ok) ->
@@ -765,7 +765,7 @@ recursive_delete1(Path) ->
Error
end, ok, FileNames) of
ok ->
- case file:del_dir(Path) of
+ case file2:del_dir(Path) of
ok -> ok;
{error, Err} -> {error, {Path, Err}}
end;
@@ -784,15 +784,15 @@ is_symlink(Name) ->
end.
recursive_copy(Src, Dest) ->
- case filelib:is_dir(Src) of
- false -> case file:copy(Src, Dest) of
+ case filelib2:is_dir(Src) of
+ false -> case file2:copy(Src, Dest) of
{ok, _Bytes} -> ok;
{error, enoent} -> ok; %% Path doesn't exist anyway
{error, Err} -> {error, {Src, Dest, Err}}
end;
- true -> case file:list_dir(Src) of
+ true -> case file2:list_dir(Src) of
{ok, FileNames} ->
- case file:make_dir(Dest) of
+ case file2:make_dir(Dest) of
ok ->
lists:foldl(
fun (FileName, ok) ->
@@ -902,10 +902,10 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
%% TODO: When we stop supporting Erlang prior to R14, this should be
%% replaced with file:open [write, exclusive]
lock_file(Path) ->
- case filelib:is_file(Path) of
+ case filelib2:is_file(Path) of
true -> {error, eexist};
- false -> {ok, Lock} = file:open(Path, [write]),
- ok = file:close(Lock)
+ false -> {ok, Lock} = file2:open(Path, [write]),
+ ok = file2:close(Lock)
end.
const_ok() -> ok.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 636913b5..2d8f6354 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -229,7 +229,7 @@
init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
- false = filelib:is_file(Dir), %% is_file == is file or dir
+ false = filelib2:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
@@ -366,9 +366,9 @@ recover(DurableQueues) ->
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
- case file:list_dir(Dir) of
+ case file2:list_dir(Dir) of
{ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
+ filelib2:is_dir(
filename:join(Dir, Entry)) ];
{error, enoent} -> []
end.
@@ -392,7 +392,7 @@ blank_state(QueueName) ->
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case file:delete(clean_file_name(Dir)) of
+ case file2:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
@@ -402,7 +402,7 @@ read_shutdown_terms(Dir) ->
store_clean_shutdown(Terms, Dir) ->
CleanFileName = clean_file_name(Dir),
- ok = filelib:ensure_dir(CleanFileName),
+ ok = filelib2:ensure_dir(CleanFileName),
rabbit_misc:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
@@ -603,8 +603,8 @@ flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
- case filelib:is_file(Path) of
- true -> ok = file:delete(Path);
+ case filelib2:is_file(Path) of
+ true -> ok = file2:delete(Path);
false -> ok
end,
SegmentsN;
@@ -630,7 +630,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- ok = filelib:ensure_dir(Path),
+ ok = filelib2:ensure_dir(Path),
{ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -735,7 +735,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end,
SegName)), Set)
end, sets:from_list(segment_nums(Segments)),
- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
+ filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
@@ -836,7 +836,7 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
- case filelib:is_file(Path) of
+ case filelib2:is_file(Path) of
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
@@ -1040,12 +1040,12 @@ foreach_queue_index(Funs) ->
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
[ok = transform_file(filename:join(Dir, Seg), SegmentFun)
- || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ || Seg <- filelib2:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
transform_file(Path, Fun) ->
PathTmp = Path ++ ".upgrade",
- case filelib:file_size(Path) of
+ case filelib2:file_size(Path) of
0 -> ok;
Size -> {ok, PathTmpHdl} =
file_handle_cache:open(PathTmp, ?WRITE_MODE,
@@ -1059,7 +1059,7 @@ transform_file(Path, Fun) ->
ok = drive_transform_fun(Fun, PathTmpHdl, Content),
ok = file_handle_cache:close(PathTmpHdl),
- ok = file:rename(PathTmp, Path)
+ ok = file2:rename(PathTmp, Path)
end.
drive_transform_fun(Fun, Hdl, Contents) ->