diff options
Diffstat (limited to 'src/couch/src/couch_stream.erl')
-rw-r--r-- | src/couch/src/couch_stream.erl | 255 |
1 files changed, 128 insertions, 127 deletions
diff --git a/src/couch/src/couch_stream.erl b/src/couch/src/couch_stream.erl index eb64484df..83b0611eb 100644 --- a/src/couch/src/couch_stream.erl +++ b/src/couch/src/couch_stream.erl @@ -14,21 +14,39 @@ -behaviour(gen_server). -vsn(1). -% public API --export([open/1, open/2, close/1]). --export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]). --export([copy_to_new_stream/3, write/2]). -% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_cast/2, handle_call/3, handle_info/2]). +-export([ + open/1, + open/2, + close/1, + + copy/2, + write/2, + to_disk_term/1, + + foldl/3, + foldl/4, + foldl_decode/5, + range_foldl/5 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + -include_lib("couch/include/couch_db.hrl"). -define(DEFAULT_BUFFER_SIZE, 4096). --record(stream, - {fd = 0, + +-record(stream, { + engine, opener_monitor, written_pointers=[], buffer_list = [], @@ -42,114 +60,94 @@ identity_len = 0, encoding_fun, end_encoding_fun - }). +}). + +open({_StreamEngine, _StreamEngineState} = Engine) -> + open(Engine, []). -%%% Interface functions %%% -open(Fd) -> - open(Fd, []). +open({_StreamEngine, _StreamEngineState} = Engine, Options) -> + gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []). -open(Fd, Options) -> - gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []). close(Pid) -> gen_server:call(Pid, close, infinity). -copy_to_new_stream(Fd, PosList, DestFd) -> - {ok, Dest} = open(DestFd), - foldl(Fd, PosList, - fun(Bin, _) -> - ok = write(Dest, Bin) - end, ok), - close(Dest). - -foldl(_Fd, [], _Fun, Acc) -> - Acc; -foldl(Fd, [Pos|Rest], Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Fun, Fun(Bin, Acc)). - -foldl(Fd, PosList, <<>>, Fun, Acc) -> - foldl(Fd, PosList, Fun, Acc); -foldl(Fd, PosList, Md5, Fun, Acc) -> - foldl(Fd, PosList, Md5, crypto:hash_init(md5), Fun, Acc). - -foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> + +copy(Src, Dst) -> + foldl(Src, fun(Bin, _) -> + ok = write(Dst, Bin) + end, ok). + + +write(_Pid, <<>>) -> + ok; +write(Pid, Bin) -> + gen_server:call(Pid, {write, Bin}, infinity). + + +to_disk_term({Engine, EngineState}) -> + Engine:to_disk_term(EngineState). + + +foldl({Engine, EngineState}, Fun, Acc) -> + Engine:foldl(EngineState, Fun, Acc). + + +foldl(Engine, <<>>, Fun, Acc) -> + foldl(Engine, Fun, Acc); +foldl(Engine, Md5, UserFun, UserAcc) -> + InitAcc = {crypto:hash_init(md5), UserFun, UserAcc}, + {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc), + Md5 = crypto:hash_final(Md5Acc), + OutAcc. + + +foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) -> {DecDataFun, DecEndFun} = case Enc of - gzip -> - ungzip_init(); - identity -> - identity_enc_dec_funs() + gzip -> ungzip_init(); + identity -> identity_enc_dec_funs() end, - Result = foldl_decode( - DecDataFun, Fd, PosList, Md5, crypto:hash_init(md5), Fun, Acc - ), + InitAcc = {DecDataFun, UserFun, UserAcc1}, + {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc), DecEndFun(), - Result. + UserAcc2. + + +range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From -> + NewEngine = do_seek(Engine, From), + InitAcc = {To - From, UserFun, UserAcc}, + try + {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc), + UserAcc2 + catch + throw:{finished, UserAcc3} -> + UserAcc3 + end. -foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = crypto:hash_final(Md5Acc), - Acc; -foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE - foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Md5 = crypto:hash_final(crypto:hash_update(Md5Acc, Bin)), - Fun(Bin, Acc); -foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Md5, crypto:hash_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). - -range_foldl(Fd, PosList, From, To, Fun, Acc) -> - range_foldl(Fd, PosList, From, To, 0, Fun, Acc). - -range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To -> - Acc; -range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc); -range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size -> - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc); -range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Bin1 = if - From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered - true -> - PrefixLen = clip(From - Off, 0, Size), - PostfixLen = clip(Off + Size - To, 0, Size), - MatchLen = Size - PrefixLen - PostfixLen, - <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin), - Match - end, - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)). -clip(Value, Lo, Hi) -> - if - Value < Lo -> Lo; - Value > Hi -> Hi; - true -> Value +foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) -> + NewMd5Acc = crypto:hash_update(Md5Acc, Bin), + {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}. + + +foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) -> + case DecFun(EncBin) of + <<>> -> {DecFun, UserFun, UserAcc}; + Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)} end. -foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = crypto:hash_final(Md5Acc), - Acc; -foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Md5 = crypto:hash_final(crypto:hash_update(Md5Acc, EncBin)), - Bin = DecFun(EncBin), - Fun(Bin, Acc); -foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Bin = DecFun(EncBin), - Md5Acc2 = crypto:hash_update(Md5Acc, EncBin), - foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +foldl_length(Bin, {Length, UserFun, UserAcc}) -> + BinSize = size(Bin), + case BinSize =< Length of + true -> + {Length - BinSize, UserFun, UserFun(Bin, UserAcc)}; + false -> + <<Trunc:BinSize/binary, _/binary>> = Bin, + throw({finished, UserFun(Trunc, UserAcc)}) + end. gzip_init(Options) -> case couch_util:get_value(compression_level, Options, 0) of @@ -192,23 +190,16 @@ identity_enc_dec_funs() -> fun() -> [] end }. -write(_Pid, <<>>) -> - ok; -write(Pid, Bin) -> - gen_server:call(Pid, {write, Bin}, infinity). - -init({Fd, OpenerPid, OpenerPriority, Options}) -> +init({Engine, OpenerPid, OpenerPriority, Options}) -> erlang:put(io_priority, OpenerPriority), {EncodingFun, EndEncodingFun} = case couch_util:get_value(encoding, Options, identity) of - identity -> - identity_enc_dec_funs(); - gzip -> - gzip_init(Options) + identity -> identity_enc_dec_funs(); + gzip -> gzip_init(Options) end, {ok, #stream{ - fd=Fd, + engine=Engine, opener_monitor=erlang:monitor(process, OpenerPid), md5=crypto:hash_init(md5), identity_md5=crypto:hash_init(md5), @@ -225,9 +216,8 @@ terminate(_Reason, _Stream) -> handle_call({write, Bin}, _From, Stream) -> BinSize = iolist_size(Bin), #stream{ - fd = Fd, + engine = Engine, written_len = WrittenLen, - written_pointers = Written, buffer_len = BufferLen, buffer_list = Buffer, max_buffer = Max, @@ -242,19 +232,18 @@ handle_call({write, Bin}, _From, Stream) -> [] -> % case where the encoder did some internal buffering % (zlib does it for example) + NewEngine = Engine, WrittenLen2 = WrittenLen, - Md5_2 = Md5, - Written2 = Written; + Md5_2 = Md5; WriteBin2 -> - {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2), + NewEngine = do_write(Engine, WriteBin2), WrittenLen2 = WrittenLen + iolist_size(WriteBin2), - Md5_2 = crypto:hash_update(Md5, WriteBin2), - Written2 = [{Pos, iolist_size(WriteBin2)}|Written] + Md5_2 = crypto:hash_update(Md5, WriteBin2) end, {reply, ok, Stream#stream{ + engine = NewEngine, written_len=WrittenLen2, - written_pointers=Written2, buffer_list=[], buffer_len=0, md5=Md5_2, @@ -268,10 +257,9 @@ handle_call({write, Bin}, _From, Stream) -> end; handle_call(close, _From, Stream) -> #stream{ - fd = Fd, + engine = Engine, opener_monitor = MonRef, written_len = WrittenLen, - written_pointers = Written, buffer_list = Buffer, md5 = Md5, identity_md5 = IdenMd5, @@ -285,12 +273,11 @@ handle_call(close, _From, Stream) -> Md5Final = crypto:hash_final(crypto:hash_update(Md5, WriteBin2)), Result = case WriteBin2 of [] -> - {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; + {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; _ -> - {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2), - StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]), + NewEngine = do_write(Engine, WriteBin2), StreamLen = WrittenLen + iolist_size(WriteBin2), - {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} + {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final} end, erlang:demonitor(MonRef), {stop, normal, Result, Stream}. @@ -305,3 +292,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) -> {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. + + +do_seek({Engine, EngineState}, Offset) -> + {ok, NewState} = Engine:seek(EngineState, Offset), + {Engine, NewState}. + +do_write({Engine, EngineState}, Data) -> + {ok, NewState} = Engine:write(EngineState, Data), + {Engine, NewState}. + +do_finalize({Engine, EngineState}) -> + {ok, NewState} = Engine:finalize(EngineState), + {Engine, NewState}. + |