summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_stream.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_stream.erl')
-rw-r--r--src/couch/src/couch_stream.erl255
1 files changed, 128 insertions, 127 deletions
diff --git a/src/couch/src/couch_stream.erl b/src/couch/src/couch_stream.erl
index eb64484df..83b0611eb 100644
--- a/src/couch/src/couch_stream.erl
+++ b/src/couch/src/couch_stream.erl
@@ -14,21 +14,39 @@
-behaviour(gen_server).
-vsn(1).
-% public API
--export([open/1, open/2, close/1]).
--export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
--export([copy_to_new_stream/3, write/2]).
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_cast/2, handle_call/3, handle_info/2]).
+-export([
+ open/1,
+ open/2,
+ close/1,
+
+ copy/2,
+ write/2,
+ to_disk_term/1,
+
+ foldl/3,
+ foldl/4,
+ foldl_decode/5,
+ range_foldl/5
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
-include_lib("couch/include/couch_db.hrl").
-define(DEFAULT_BUFFER_SIZE, 4096).
--record(stream,
- {fd = 0,
+
+-record(stream, {
+ engine,
opener_monitor,
written_pointers=[],
buffer_list = [],
@@ -42,114 +60,94 @@
identity_len = 0,
encoding_fun,
end_encoding_fun
- }).
+}).
+
+open({_StreamEngine, _StreamEngineState} = Engine) ->
+ open(Engine, []).
-%%% Interface functions %%%
-open(Fd) ->
- open(Fd, []).
+open({_StreamEngine, _StreamEngineState} = Engine, Options) ->
+ gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []).
-open(Fd, Options) ->
- gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
-copy_to_new_stream(Fd, PosList, DestFd) ->
- {ok, Dest} = open(DestFd),
- foldl(Fd, PosList,
- fun(Bin, _) ->
- ok = write(Dest, Bin)
- end, ok),
- close(Dest).
-
-foldl(_Fd, [], _Fun, Acc) ->
- Acc;
-foldl(Fd, [Pos|Rest], Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-
-foldl(Fd, PosList, <<>>, Fun, Acc) ->
- foldl(Fd, PosList, Fun, Acc);
-foldl(Fd, PosList, Md5, Fun, Acc) ->
- foldl(Fd, PosList, Md5, crypto:hash_init(md5), Fun, Acc).
-
-foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+
+copy(Src, Dst) ->
+ foldl(Src, fun(Bin, _) ->
+ ok = write(Dst, Bin)
+ end, ok).
+
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
+ gen_server:call(Pid, {write, Bin}, infinity).
+
+
+to_disk_term({Engine, EngineState}) ->
+ Engine:to_disk_term(EngineState).
+
+
+foldl({Engine, EngineState}, Fun, Acc) ->
+ Engine:foldl(EngineState, Fun, Acc).
+
+
+foldl(Engine, <<>>, Fun, Acc) ->
+ foldl(Engine, Fun, Acc);
+foldl(Engine, Md5, UserFun, UserAcc) ->
+ InitAcc = {crypto:hash_init(md5), UserFun, UserAcc},
+ {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc),
+ Md5 = crypto:hash_final(Md5Acc),
+ OutAcc.
+
+
+foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) ->
{DecDataFun, DecEndFun} = case Enc of
- gzip ->
- ungzip_init();
- identity ->
- identity_enc_dec_funs()
+ gzip -> ungzip_init();
+ identity -> identity_enc_dec_funs()
end,
- Result = foldl_decode(
- DecDataFun, Fd, PosList, Md5, crypto:hash_init(md5), Fun, Acc
- ),
+ InitAcc = {DecDataFun, UserFun, UserAcc1},
+ {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc),
DecEndFun(),
- Result.
+ UserAcc2.
+
+
+range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From ->
+ NewEngine = do_seek(Engine, From),
+ InitAcc = {To - From, UserFun, UserAcc},
+ try
+ {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc),
+ UserAcc2
+ catch
+ throw:{finished, UserAcc3} ->
+ UserAcc3
+ end.
-foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = crypto:hash_final(Md5Acc),
- Acc;
-foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
- foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = crypto:hash_final(crypto:hash_update(Md5Acc, Bin)),
- Fun(Bin, Acc);
-foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Md5, crypto:hash_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
-
-range_foldl(Fd, PosList, From, To, Fun, Acc) ->
- range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
-
-range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
- Acc;
-range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
-range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
-range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
- {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Bin1 = if
- From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
- true ->
- PrefixLen = clip(From - Off, 0, Size),
- PostfixLen = clip(Off + Size - To, 0, Size),
- MatchLen = Size - PrefixLen - PostfixLen,
- <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
- Match
- end,
- range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
-clip(Value, Lo, Hi) ->
- if
- Value < Lo -> Lo;
- Value > Hi -> Hi;
- true -> Value
+foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) ->
+ NewMd5Acc = crypto:hash_update(Md5Acc, Bin),
+ {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}.
+
+
+foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) ->
+ case DecFun(EncBin) of
+ <<>> -> {DecFun, UserFun, UserAcc};
+ Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)}
end.
-foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = crypto:hash_final(Md5Acc),
- Acc;
-foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = crypto:hash_final(crypto:hash_update(Md5Acc, EncBin)),
- Bin = DecFun(EncBin),
- Fun(Bin, Acc);
-foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
- foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
-foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
- Bin = DecFun(EncBin),
- Md5Acc2 = crypto:hash_update(Md5Acc, EncBin),
- foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+foldl_length(Bin, {Length, UserFun, UserAcc}) ->
+ BinSize = size(Bin),
+ case BinSize =< Length of
+ true ->
+ {Length - BinSize, UserFun, UserFun(Bin, UserAcc)};
+ false ->
+ <<Trunc:BinSize/binary, _/binary>> = Bin,
+ throw({finished, UserFun(Trunc, UserAcc)})
+ end.
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
@@ -192,23 +190,16 @@ identity_enc_dec_funs() ->
fun() -> [] end
}.
-write(_Pid, <<>>) ->
- ok;
-write(Pid, Bin) ->
- gen_server:call(Pid, {write, Bin}, infinity).
-
-init({Fd, OpenerPid, OpenerPriority, Options}) ->
+init({Engine, OpenerPid, OpenerPriority, Options}) ->
erlang:put(io_priority, OpenerPriority),
{EncodingFun, EndEncodingFun} =
case couch_util:get_value(encoding, Options, identity) of
- identity ->
- identity_enc_dec_funs();
- gzip ->
- gzip_init(Options)
+ identity -> identity_enc_dec_funs();
+ gzip -> gzip_init(Options)
end,
{ok, #stream{
- fd=Fd,
+ engine=Engine,
opener_monitor=erlang:monitor(process, OpenerPid),
md5=crypto:hash_init(md5),
identity_md5=crypto:hash_init(md5),
@@ -225,9 +216,8 @@ terminate(_Reason, _Stream) ->
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
- fd = Fd,
+ engine = Engine,
written_len = WrittenLen,
- written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
@@ -242,19 +232,18 @@ handle_call({write, Bin}, _From, Stream) ->
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
+ NewEngine = Engine,
WrittenLen2 = WrittenLen,
- Md5_2 = Md5,
- Written2 = Written;
+ Md5_2 = Md5;
WriteBin2 ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
+ NewEngine = do_write(Engine, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
- Md5_2 = crypto:hash_update(Md5, WriteBin2),
- Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
+ Md5_2 = crypto:hash_update(Md5, WriteBin2)
end,
{reply, ok, Stream#stream{
+ engine = NewEngine,
written_len=WrittenLen2,
- written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
@@ -268,10 +257,9 @@ handle_call({write, Bin}, _From, Stream) ->
end;
handle_call(close, _From, Stream) ->
#stream{
- fd = Fd,
+ engine = Engine,
opener_monitor = MonRef,
written_len = WrittenLen,
- written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
@@ -285,12 +273,11 @@ handle_call(close, _From, Stream) ->
Md5Final = crypto:hash_final(crypto:hash_update(Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
- {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
+ {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
- {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
- StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
+ NewEngine = do_write(Engine, WriteBin2),
StreamLen = WrittenLen + iolist_size(WriteBin2),
- {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
+ {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
erlang:demonitor(MonRef),
{stop, normal, Result, Stream}.
@@ -305,3 +292,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
+
+
+do_seek({Engine, EngineState}, Offset) ->
+ {ok, NewState} = Engine:seek(EngineState, Offset),
+ {Engine, NewState}.
+
+do_write({Engine, EngineState}, Data) ->
+ {ok, NewState} = Engine:write(EngineState, Data),
+ {Engine, NewState}.
+
+do_finalize({Engine, EngineState}) ->
+ {ok, NewState} = Engine:finalize(EngineState),
+ {Engine, NewState}.
+