diff options
authorPaul J. Davis <>2017-09-06 13:36:16 -0500
committerPaul J. Davis <>2020-04-14 11:26:13 -0500
commitdabd4e244b7807ce71e9f6905d184d4a8e135410 (patch)
parent1caf374bdc89fbe41ecb0908ccb7c170640612af (diff)
Add multi-append functions to couch_file
These functions allow the caller to append multiple terms or binaries to a file and receive the file position and size for each individual element. This is to optimize throughput in situations where we want to write multiple pieces of independant data.
1 files changed, 135 insertions, 27 deletions
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 6db23eaa3..b1e355518 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -42,6 +42,8 @@
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
+-export([pread_terms/2, pread_binaries/2, pread_iolists/2]).
+-export([append_terms/2, append_terms/3, append_binaries/2]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
@@ -119,6 +121,7 @@ append_term_md5(Fd, Term, Options) ->
Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
append_binary_md5(Fd, couch_compress:compress(Term, Comp)).
%% Purpose: To append an Erlang binary to the end of the file.
%% Args: Erlang term to serialize and append to the file.
@@ -129,7 +132,7 @@ append_term_md5(Fd, Term, Options) ->
append_binary(Fd, Bin) ->
ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
append_binary_md5(Fd, Bin) ->
{append_bin, assemble_file_chunk(Bin, couch_hash:md5_hash(Bin))},
@@ -172,21 +175,55 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
- {ok, IoList, <<>>} ->
- {ok, IoList};
- {ok, IoList, Md5} ->
- case couch_hash:md5_hash(IoList) of
- Md5 ->
- {ok, IoList};
- _ ->
- couch_log:emergency("File corruption in ~p at position ~B",
- [Fd, Pos]),
- exit({file_corruption, <<"file corruption">>})
- end;
- Error ->
- Error
+ {ok, IoList, Md5} ->
+ {ok, verify_md5(Fd, Pos, IoList, Md5)};
+ Error ->
+ Error
+ end.
+pread_terms(Fd, PosList) ->
+ {ok, Bins} = pread_binaries(Fd, PosList),
+ Terms = lists:map(fun(Bin) ->
+ couch_compress:decompress(Bin)
+ end, Bins),
+ {ok, Terms}.
+pread_binaries(Fd, PosList) ->
+ {ok, Data} = pread_iolists(Fd, PosList),
+ {ok, lists:map(fun erlang:iolist_to_binary/1, Data)}.
+pread_iolists(Fd, PosList) ->
+ case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
+ {ok, DataMd5s} ->
+ Data = lists:zipwith(fun(Pos, {IoList, Md5}) ->
+ verify_md5(Fd, Pos, IoList, Md5)
+ end, PosList, DataMd5s),
+ {ok, Data};
+ Error ->
+ Error
+append_terms(Fd, Terms) ->
+ append_terms(Fd, Terms, []).
+append_terms(Fd, Terms, Options) ->
+ Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
+ Bins = lists:map(fun(Term) ->
+ couch_compress:compress(Term, Comp)
+ end, Terms),
+ append_binaries(Fd, Bins).
+append_binaries(Fd, Bins) ->
+ WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
+ ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
%% Purpose: The length of a file, in bytes.
%% Returns: {ok, Bytes}
@@ -464,6 +501,30 @@ handle_call({pread_iolist, Pos}, _From, File) ->
{reply, {ok, Iolist, <<>>}, File}
+handle_call({pread_iolists, PosL}, _From, File) ->
+ update_read_timestamp(),
+ LocNums1 = [{Pos, 4} || Pos <- PosL],
+ DataSizes = read_multi_raw_iolists_int(File, LocNums1),
+ LocNums2 = lists:map(fun({LenIoList, NextPos}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, Len:31/integer>> -> % an MD5-prefixed term
+ {NextPos, Len + 16};
+ <<0:1/integer, Len:31/integer>> ->
+ {NextPos, Len}
+ end
+ end, DataSizes),
+ Resps = read_multi_raw_iolists_int(File, LocNums2),
+ Extracted = lists:zipwith(fun({LenIoList, _}, {IoList, _}) ->
+ case iolist_to_binary(LenIoList) of
+ <<1:1/integer, _:31/integer>> ->
+ {Md5, IoList} = extract_md5(IoList),
+ {IoList, Md5};
+ <<0:1/integer, _:31/integer>> ->
+ {IoList, <<>>}
+ end
+ end, DataSizes, Resps),
+ {reply, {ok, Extracted}, File};
handle_call(bytes, _From, #file{fd = Fd} = File) ->
{reply, file:position(Fd, eof), File};
@@ -506,6 +567,20 @@ handle_call({append_bin, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, Error, reset_eof(File)}
+handle_call({append_bins, Bins}, _From, #file{fd = Fd, eof = Pos} = File) ->
+ {BlockResps, FinalPos} = lists:mapfoldl(fun(Bin, PosAcc) ->
+ Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
+ Size = iolist_size(Blocks),
+ {{Blocks, {PosAcc, Size}}, PosAcc + Size}
+ end, Pos, Bins),
+ {AllBlocks, Resps} = lists:unzip(BlockResps),
+ case file:write(Fd, AllBlocks) of
+ ok ->
+ {reply, {ok, Resps}, File#file{eof = FinalPos}};
+ Error ->
+ {reply, Error, reset_eof(File)}
+ end;
handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of
@@ -634,23 +709,40 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
{Data::iolist(), CurPos::non_neg_integer()}.
read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
read_raw_iolist_int(Fd, Pos, Len);
-read_raw_iolist_int(#file{fd = Fd, pread_limit = Limit} = F, Pos, Len) ->
+read_raw_iolist_int(#file{fd = Fd} = File, Pos, Len) ->
+ {Pos, TotalBytes} = get_pread_locnum(File, Pos, Len),
+ {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+ {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}.
+read_multi_raw_iolists_int(#file{fd = Fd} = File, PosLens) ->
+ LocNums = lists:map(fun({Pos, Len}) ->
+ get_pread_locnum(File, Pos, Len)
+ end, PosLens),
+ {ok, Bins} = file:pread(Fd, LocNums),
+ lists:zipwith(fun({Pos, TotalBytes}, Bin) ->
+ <<RawBin:TotalBytes/binary>> = Bin,
+ {remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}
+ end, LocNums, Bins).
+get_pread_locnum(File, Pos, Len) ->
BlockOffset = Pos rem ?SIZE_BLOCK,
TotalBytes = calculate_total_read_len(BlockOffset, Len),
- if
- (Pos + TotalBytes) > F#file.eof ->
- couch_stats:increment_counter([pread, exceed_eof]),
- {_Fd, Filepath} = get(couch_file_fd),
- throw({read_beyond_eof, Filepath});
- TotalBytes > Limit ->
- couch_stats:increment_counter([pread, exceed_limit]),
- {_Fd, Filepath} = get(couch_file_fd),
- throw({exceed_pread_limit, Filepath, Limit});
- true ->
- {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
- {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}
+ case Pos + TotalBytes of
+ Size when Size > File#file.eof ->
+ couch_stats:increment_counter([pread, exceed_eof]),
+ {_Fd, Filepath} = get(couch_file_fd),
+ throw({read_beyond_eof, Filepath});
+ Size when Size > File#file.pread_limit ->
+ couch_stats:increment_counter([pread, exceed_limit]),
+ {_Fd, Filepath} = get(couch_file_fd),
+ throw({exceed_pread_limit, Filepath, File#file.pread_limit});
+ _ ->
+ {Pos, TotalBytes}
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
{Md5List, IoList} = split_iolist(FullIoList, 16, []),
@@ -722,6 +814,22 @@ monitored_by_pids() ->
{monitored_by, PidsAndRefs} = process_info(self(), monitored_by),
lists:filter(fun is_pid/1, PidsAndRefs).
+verify_md5(_Fd, _Pos, IoList, <<>>) ->
+ IoList;
+verify_md5(Fd, Pos, IoList, Md5) ->
+ case couch_hash:md5_hash(IoList) of
+ Md5 -> IoList;
+ _ -> report_md5_error(Fd, Pos)
+ end.
+report_md5_error(Fd, Pos) ->
+ couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]),
+ exit({file_corruption, <<"file corruption">>}).
% System dbs aren't monitored by couch_stats_process_tracker
is_idle(#file{is_sys=true}) ->
case monitored_by_pids() of