summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_stream.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_stream.erl')
-rw-r--r--src/couch/src/couch_stream.erl308
1 files changed, 0 insertions, 308 deletions
diff --git a/src/couch/src/couch_stream.erl b/src/couch/src/couch_stream.erl
deleted file mode 100644
index 2ab46d7e7..000000000
--- a/src/couch/src/couch_stream.erl
+++ /dev/null
@@ -1,308 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_stream).
--behaviour(gen_server).
--vsn(1).
-
-
--export([
- open/1,
- open/2,
- close/1,
-
- copy/2,
- write/2,
- to_disk_term/1,
-
- foldl/3,
- foldl/4,
- foldl_decode/5,
- range_foldl/5
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3
-]).
-
-
--include_lib("couch/include/couch_db.hrl").
-
--define(DEFAULT_BUFFER_SIZE, 4096).
-
-
--record(stream, {
- engine,
- opener_monitor,
- written_pointers=[],
- buffer_list = [],
- buffer_len = 0,
- max_buffer,
- written_len = 0,
- md5,
- % md5 of the content without any transformation applied (e.g. compression)
- % needed for the attachment upload integrity check (ticket 558)
- identity_md5,
- identity_len = 0,
- encoding_fun,
- end_encoding_fun
-}).
-
-
-open({_StreamEngine, _StreamEngineState} = Engine) ->
- open(Engine, []).
-
-
-open({_StreamEngine, _StreamEngineState} = Engine, Options) ->
- gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []).
-
-
-close(Pid) ->
- gen_server:call(Pid, close, infinity).
-
-
-copy(Src, Dst) ->
- foldl(Src, fun(Bin, _) ->
- ok = write(Dst, Bin)
- end, ok).
-
-
-write(_Pid, <<>>) ->
- ok;
-write(Pid, Bin) ->
- gen_server:call(Pid, {write, Bin}, infinity).
-
-
-to_disk_term({Engine, EngineState}) ->
- Engine:to_disk_term(EngineState).
-
-
-foldl({Engine, EngineState}, Fun, Acc) ->
- Engine:foldl(EngineState, Fun, Acc).
-
-
-foldl(Engine, <<>>, Fun, Acc) ->
- foldl(Engine, Fun, Acc);
-foldl(Engine, Md5, UserFun, UserAcc) ->
- InitAcc = {couch_hash:md5_hash_init(), UserFun, UserAcc},
- {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc),
- Md5 = couch_hash:md5_hash_final(Md5Acc),
- OutAcc.
-
-
-foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) ->
- {DecDataFun, DecEndFun} = case Enc of
- gzip -> ungzip_init();
- identity -> identity_enc_dec_funs()
- end,
- InitAcc = {DecDataFun, UserFun, UserAcc1},
- {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc),
- DecEndFun(),
- UserAcc2.
-
-
-range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From ->
- NewEngine = do_seek(Engine, From),
- InitAcc = {To - From, UserFun, UserAcc},
- try
- {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc),
- UserAcc2
- catch
- throw:{finished, UserAcc3} ->
- UserAcc3
- end.
-
-
-foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) ->
- NewMd5Acc = couch_hash:md5_hash_update(Md5Acc, Bin),
- {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}.
-
-
-foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) ->
- case DecFun(EncBin) of
- <<>> -> {DecFun, UserFun, UserAcc};
- Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)}
- end.
-
-
-foldl_length(Bin, {Length, UserFun, UserAcc}) ->
- BinSize = size(Bin),
- case BinSize =< Length of
- true ->
- {Length - BinSize, UserFun, UserFun(Bin, UserAcc)};
- false ->
- <<Trunc:Length/binary, _/binary>> = Bin,
- throw({finished, UserFun(Trunc, UserAcc)})
- end.
-
-gzip_init(Options) ->
- case couch_util:get_value(compression_level, Options, 0) of
- Lvl when Lvl >= 1 andalso Lvl =< 9 ->
- Z = zlib:open(),
- % 15 = ?MAX_WBITS (defined in the zlib module)
- % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
- ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
- {
- fun(Data) ->
- zlib:deflate(Z, Data)
- end,
- fun() ->
- Last = zlib:deflate(Z, [], finish),
- ok = zlib:deflateEnd(Z),
- ok = zlib:close(Z),
- Last
- end
- };
- _ ->
- identity_enc_dec_funs()
- end.
-
-ungzip_init() ->
- Z = zlib:open(),
- zlib:inflateInit(Z, 16 + 15),
- {
- fun(Data) ->
- zlib:inflate(Z, Data)
- end,
- fun() ->
- ok = zlib:inflateEnd(Z),
- ok = zlib:close(Z)
- end
- }.
-
-identity_enc_dec_funs() ->
- {
- fun(Data) -> Data end,
- fun() -> [] end
- }.
-
-
-init({Engine, OpenerPid, OpenerPriority, Options}) ->
- erlang:put(io_priority, OpenerPriority),
- {EncodingFun, EndEncodingFun} =
- case couch_util:get_value(encoding, Options, identity) of
- identity -> identity_enc_dec_funs();
- gzip -> gzip_init(Options)
- end,
- {ok, #stream{
- engine=Engine,
- opener_monitor=erlang:monitor(process, OpenerPid),
- md5=couch_hash:md5_hash_init(),
- identity_md5=couch_hash:md5_hash_init(),
- encoding_fun=EncodingFun,
- end_encoding_fun=EndEncodingFun,
- max_buffer=couch_util:get_value(
- buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
- }
- }.
-
-terminate(_Reason, _Stream) ->
- ok.
-
-handle_call({write, Bin}, _From, Stream) ->
- BinSize = iolist_size(Bin),
- #stream{
- engine = Engine,
- written_len = WrittenLen,
- buffer_len = BufferLen,
- buffer_list = Buffer,
- max_buffer = Max,
- md5 = Md5,
- identity_md5 = IdenMd5,
- identity_len = IdenLen,
- encoding_fun = EncodingFun} = Stream,
- if BinSize + BufferLen > Max ->
- WriteBin = lists:reverse(Buffer, [Bin]),
- IdenMd5_2 = couch_hash:md5_hash_update(IdenMd5, WriteBin),
- case EncodingFun(WriteBin) of
- [] ->
- % case where the encoder did some internal buffering
- % (zlib does it for example)
- NewEngine = Engine,
- WrittenLen2 = WrittenLen,
- Md5_2 = Md5;
- WriteBin2 ->
- NewEngine = do_write(Engine, WriteBin2),
- WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
- Md5_2 = couch_hash:md5_hash_update(Md5, WriteBin2)
- end,
-
- {reply, ok, Stream#stream{
- engine = NewEngine,
- written_len=WrittenLen2,
- buffer_list=[],
- buffer_len=0,
- md5=Md5_2,
- identity_md5=IdenMd5_2,
- identity_len=IdenLen + BinSize}, hibernate};
- true ->
- {reply, ok, Stream#stream{
- buffer_list=[Bin|Buffer],
- buffer_len=BufferLen + BinSize,
- identity_len=IdenLen + BinSize}}
- end;
-handle_call(close, _From, Stream) ->
- #stream{
- engine = Engine,
- opener_monitor = MonRef,
- written_len = WrittenLen,
- buffer_list = Buffer,
- md5 = Md5,
- identity_md5 = IdenMd5,
- identity_len = IdenLen,
- encoding_fun = EncodingFun,
- end_encoding_fun = EndEncodingFun} = Stream,
-
- WriteBin = lists:reverse(Buffer),
- IdenMd5Final = couch_hash:md5_hash_final(couch_hash:md5_hash_update(IdenMd5, WriteBin)),
- WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
- Md5Final = couch_hash:md5_hash_final(couch_hash:md5_hash_update(Md5, WriteBin2)),
- Result = case WriteBin2 of
- [] ->
- {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
- _ ->
- NewEngine = do_write(Engine, WriteBin2),
- StreamLen = WrittenLen + iolist_size(WriteBin2),
- {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final}
- end,
- erlang:demonitor(MonRef),
- {stop, normal, Result, Stream}.
-
-handle_cast(_Msg, State) ->
- {noreply,State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
- {stop, normal, State};
-handle_info(_Info, State) ->
- {noreply, State}.
-
-
-do_seek({Engine, EngineState}, Offset) ->
- {ok, NewState} = Engine:seek(EngineState, Offset),
- {Engine, NewState}.
-
-do_write({Engine, EngineState}, Data) ->
- {ok, NewState} = Engine:write(EngineState, Data),
- {Engine, NewState}.
-
-do_finalize({Engine, EngineState}) ->
- {ok, NewState} = Engine:finalize(EngineState),
- {Engine, NewState}.
-