diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-09-06 13:36:16 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-04-14 11:26:13 -0500 |
commit | dabd4e244b7807ce71e9f6905d184d4a8e135410 (patch) | |
tree | e6b8f0d72a08e1a71d0118de5e6cd94515a74043 | |
parent | 1caf374bdc89fbe41ecb0908ccb7c170640612af (diff) | |
download | couchdb-dabd4e244b7807ce71e9f6905d184d4a8e135410.tar.gz |
Add multi-append functions to couch_file
These functions allow the caller to append multiple terms or binaries to
a file and receive the file position and size for each individual
element. This is to optimize throughput in situations where we want to
write multiple pieces of independant data.
-rw-r--r-- | src/couch/src/couch_file.erl | 162 |
1 files changed, 135 insertions, 27 deletions
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl index 6db23eaa3..b1e355518 100644 --- a/src/couch/src/couch_file.erl +++ b/src/couch/src/couch_file.erl @@ -42,6 +42,8 @@ -export([append_binary/2, append_binary_md5/2]). -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]). -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]). +-export([pread_terms/2, pread_binaries/2, pread_iolists/2]). +-export([append_terms/2, append_terms/3, append_binaries/2]). -export([write_header/2, read_header/1]). -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]). -export([last_read/1]). @@ -119,6 +121,7 @@ append_term_md5(Fd, Term, Options) -> Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION), append_binary_md5(Fd, couch_compress:compress(Term, Comp)). + %%---------------------------------------------------------------------- %% Purpose: To append an Erlang binary to the end of the file. %% Args: Erlang term to serialize and append to the file. @@ -129,7 +132,7 @@ append_term_md5(Fd, Term, Options) -> append_binary(Fd, Bin) -> ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)). - + append_binary_md5(Fd, Bin) -> ioq:call(Fd, {append_bin, assemble_file_chunk(Bin, couch_hash:md5_hash(Bin))}, @@ -172,21 +175,55 @@ pread_binary(Fd, Pos) -> pread_iolist(Fd, Pos) -> case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of - {ok, IoList, <<>>} -> - {ok, IoList}; - {ok, IoList, Md5} -> - case couch_hash:md5_hash(IoList) of - Md5 -> - {ok, IoList}; - _ -> - couch_log:emergency("File corruption in ~p at position ~B", - [Fd, Pos]), - exit({file_corruption, <<"file corruption">>}) - end; - Error -> - Error + {ok, IoList, Md5} -> + {ok, verify_md5(Fd, Pos, IoList, Md5)}; + Error -> + Error + end. + + +pread_terms(Fd, PosList) -> + {ok, Bins} = pread_binaries(Fd, PosList), + Terms = lists:map(fun(Bin) -> + couch_compress:decompress(Bin) + end, Bins), + {ok, Terms}. + + +pread_binaries(Fd, PosList) -> + {ok, Data} = pread_iolists(Fd, PosList), + {ok, lists:map(fun erlang:iolist_to_binary/1, Data)}. + + +pread_iolists(Fd, PosList) -> + case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of + {ok, DataMd5s} -> + Data = lists:zipwith(fun(Pos, {IoList, Md5}) -> + verify_md5(Fd, Pos, IoList, Md5) + end, PosList, DataMd5s), + {ok, Data}; + Error -> + Error end. + +append_terms(Fd, Terms) -> + append_terms(Fd, Terms, []). + + +append_terms(Fd, Terms, Options) -> + Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION), + Bins = lists:map(fun(Term) -> + couch_compress:compress(Term, Comp) + end, Terms), + append_binaries(Fd, Bins). + + +append_binaries(Fd, Bins) -> + WriteBins = lists:map(fun assemble_file_chunk/1, Bins), + ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)). + + %%---------------------------------------------------------------------- %% Purpose: The length of a file, in bytes. %% Returns: {ok, Bytes} @@ -464,6 +501,30 @@ handle_call({pread_iolist, Pos}, _From, File) -> {reply, {ok, Iolist, <<>>}, File} end; +handle_call({pread_iolists, PosL}, _From, File) -> + update_read_timestamp(), + LocNums1 = [{Pos, 4} || Pos <- PosL], + DataSizes = read_multi_raw_iolists_int(File, LocNums1), + LocNums2 = lists:map(fun({LenIoList, NextPos}) -> + case iolist_to_binary(LenIoList) of + <<1:1/integer, Len:31/integer>> -> % an MD5-prefixed term + {NextPos, Len + 16}; + <<0:1/integer, Len:31/integer>> -> + {NextPos, Len} + end + end, DataSizes), + Resps = read_multi_raw_iolists_int(File, LocNums2), + Extracted = lists:zipwith(fun({LenIoList, _}, {IoList, _}) -> + case iolist_to_binary(LenIoList) of + <<1:1/integer, _:31/integer>> -> + {Md5, IoList} = extract_md5(IoList), + {IoList, Md5}; + <<0:1/integer, _:31/integer>> -> + {IoList, <<>>} + end + end, DataSizes, Resps), + {reply, {ok, Extracted}, File}; + handle_call(bytes, _From, #file{fd = Fd} = File) -> {reply, file:position(Fd, eof), File}; @@ -506,6 +567,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) -> {reply, Error, reset_eof(File)} end; +handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) -> + {BlockResps, FinalPos} = lists:mapfoldl(fun(Bin, PosAcc) -> + Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin), + Size = iolist_size(Blocks), + {{Blocks, {PosAcc, Size}}, PosAcc + Size} + end, Pos, Bins), + {AllBlocks, Resps} = lists:unzip(BlockResps), + case file:write(Fd, AllBlocks) of + ok -> + {reply, {ok, Resps}, File#file{eof = FinalPos}}; + Error -> + {reply, Error, reset_eof(File)} + end; + handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) -> BinSize = byte_size(Bin), case Pos rem ?SIZE_BLOCK of @@ -634,23 +709,40 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) -> {Data::iolist(), CurPos::non_neg_integer()}. read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE read_raw_iolist_int(Fd, Pos, Len); -read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) -> +read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) -> + {Pos, TotalBytes} = get_pread_locnum(File, Pos, Len), + {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes), + {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}. + + +read_multi_raw_iolists_int(#file{fd = Fd} = File, PosLens) -> + LocNums = lists:map(fun({Pos, Len}) -> + get_pread_locnum(File, Pos, Len) + end, PosLens), + {ok, Bins} = file:pread(Fd, LocNums), + lists:zipwith(fun({Pos, TotalBytes}, Bin) -> + <<RawBin:TotalBytes/binary>> = Bin, + {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes} + end, LocNums, Bins). + + +get_pread_locnum(File, Pos, Len) -> BlockOffset = Pos rem ?SIZE_BLOCK, TotalBytes = calculate_total_read_len(BlockOffset, Len), - if - (Pos + TotalBytes) > F#file.eof -> - couch_stats:increment_counter([pread, exceed_eof]), - {_Fd, Filepath} = get(couch_file_fd), - throw({read_beyond_eof, Filepath}); - TotalBytes > Limit -> - couch_stats:increment_counter([pread, exceed_limit]), - {_Fd, Filepath} = get(couch_file_fd), - throw({exceed_pread_limit, Filepath, Limit}); - true -> - {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes), - {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes} + case Pos + TotalBytes of + Size when Size > File#file.eof -> + couch_stats:increment_counter([pread, exceed_eof]), + {_Fd, Filepath} = get(couch_file_fd), + throw({read_beyond_eof, Filepath}); + Size when Size > File#file.pread_limit -> + couch_stats:increment_counter([pread, exceed_limit]), + {_Fd, Filepath} = get(couch_file_fd), + throw({exceed_pread_limit, Filepath, File#file.pread_limit}); + _ -> + {Pos, TotalBytes} end. + -spec extract_md5(iolist()) -> {binary(), iolist()}. extract_md5(FullIoList) -> {Md5List, IoList} = split_iolist(FullIoList, 16, []), @@ -722,6 +814,22 @@ monitored_by_pids() -> {monitored_by, PidsAndRefs} = process_info(self(), monitored_by), lists:filter(fun is_pid/1, PidsAndRefs). + +verify_md5(_Fd, _Pos, IoList, <<>>) -> + IoList; + +verify_md5(Fd, Pos, IoList, Md5) -> + case couch_hash:md5_hash(IoList) of + Md5 -> IoList; + _ -> report_md5_error(Fd, Pos) + end. + + +report_md5_error(Fd, Pos) -> + couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]), + exit({file_corruption, <<"file corruption">>}). + + % System dbs aren't monitored by couch_stats_process_tracker is_idle(#file{is_sys=true}) -> case monitored_by_pids() of |