diff options
Diffstat (limited to 'src/couch/src/couch_att.erl')
-rw-r--r-- | src/couch/src/couch_att.erl | 146 |
1 files changed, 96 insertions, 50 deletions
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index 3380f5739..16edd66ce 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -152,9 +152,31 @@ data_prop() | encoding_prop() ]. +-type disk_att_v1() :: { + Name :: binary(), + Type :: binary(), + Sp :: any(), + AttLen :: non_neg_integer(), + RevPos :: non_neg_integer(), + Md5 :: binary() +}. + +-type disk_att_v2() :: { + Name :: binary(), + Type :: binary(), + Sp :: any(), + AttLen :: non_neg_integer(), + DiskLen :: non_neg_integer(), + RevPos :: non_neg_integer(), + Md5 :: binary(), + Enc :: identity | gzip +}. + +-type disk_att_v3() :: {Base :: tuple(), Extended :: list()}. --type att() :: #att{} | attachment(). +-type disk_att() :: disk_att_v1() | disk_att_v2() | disk_att_v3(). +-type att() :: #att{} | attachment() | disk_att(). new() -> %% We construct a record by default for compatability. This will be @@ -297,11 +319,12 @@ size_info(Atts) -> %% as safe as possible, avoiding the need for complicated disk versioning %% schemes. to_disk_term(#att{} = Att) -> - {_, StreamIndex} = fetch(data, Att), + {stream, StreamEngine} = fetch(data, Att), + {ok, Sp} = couch_stream:to_disk_term(StreamEngine), { fetch(name, Att), fetch(type, Att), - StreamIndex, + Sp, fetch(att_len, Att), fetch(disk_len, Att), fetch(revpos, Att), @@ -314,9 +337,13 @@ to_disk_term(Att) -> fun (data, {Props, Values}) -> case lists:keytake(data, 1, Props) of - {value, {_, {_Fd, Sp}}, Other} -> {Other, [Sp | Values]}; - {value, {_, Value}, Other} -> {Other, [Value | Values]}; - false -> {Props, [undefined |Values ]} + {value, {_, {stream, StreamEngine}}, Other} -> + {ok, Sp} = couch_stream:to_disk_term(StreamEngine), + {Other, [Sp | Values]}; + {value, {_, Value}, Other} -> + {Other, [Value | Values]}; + false -> + {Props, [undefined | Values]} end; (Key, {Props, Values}) -> case lists:keytake(Key, 1, Props) of @@ -337,9 +364,11 @@ to_disk_term(Att) -> %% compression to remove these sorts of common bits (block level compression %% with something like a shared dictionary that is checkpointed every now and %% then). -from_disk_term(Fd, {Base, Extended}) when is_tuple(Base), is_list(Extended) -> - store(Extended, from_disk_term(Fd, Base)); -from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> +from_disk_term(StreamSrc, {Base, Extended}) + when is_tuple(Base), is_list(Extended) -> + store(Extended, from_disk_term(StreamSrc, Base)); +from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -347,10 +376,11 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> disk_len=DiskLen, md5=Md5, revpos=RevPos, - data={Fd,Sp}, + data={stream, Stream}, encoding=upgrade_encoding(Enc) }; -from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) -> +from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -358,9 +388,10 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) -> disk_len=AttLen, md5=Md5, revpos=RevPos, - data={Fd,Sp} + data={stream, Stream} }; -from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) -> +from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -368,7 +399,7 @@ from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) -> disk_len=AttLen, md5= <<>>, revpos=0, - data={Fd,Sp} + data={stream, Stream} }. @@ -490,37 +521,20 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) -> {Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}. -flush(Fd, Att) -> - flush_data(Fd, fetch(data, Att), Att). +flush(Db, Att) -> + flush_data(Db, fetch(data, Att), Att). -flush_data(Fd, {stream, {couch_bt_engine_stream, {OtherFd, StreamPointer}}}, - Att) -> - flush_data(Fd, {OtherFd, StreamPointer}, Att); -flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd -> - % already written to our file, nothing to write - Att; -flush_data(Fd, {OtherFd, StreamPointer}, Att) -> - [InMd5, InDiskLen] = fetch([md5, disk_len], Att), - {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = - couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - couch_db:check_md5(IdentityMd5, InMd5), - store([ - {data, {Fd, NewStreamData}}, - {md5, Md5}, - {att_len, Len}, - {disk_len, InDiskLen} - ], Att); -flush_data(Fd, Data, Att) when is_binary(Data) -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> +flush_data(Db, Data, Att) when is_binary(Data) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> couch_stream:write(OutputStream, Data) end); -flush_data(Fd, Fun, Att) when is_function(Fun) -> +flush_data(Db, Fun, Att) when is_function(Fun) -> AttName = fetch(name, Att), MaxAttSize = max_attachment_size(), case fetch(att_len, Att) of undefined -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, Fun(4096, @@ -545,11 +559,11 @@ flush_data(Fd, Fun, Att) when is_function(Fun) -> end); AttLen -> validate_attachment_size(AttName, AttLen, MaxAttSize), - couch_db:with_stream(Fd, Att, fun(OutputStream) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> write_streamed_attachment(OutputStream, Fun, AttLen) end) end; -flush_data(Fd, {follows, Parser, Ref}, Att) -> +flush_data(Db, {follows, Parser, Ref}, Att) -> ParserRef = erlang:monitor(process, Parser), Fun = fun() -> Parser ! {get_bytes, Ref, self()}, @@ -563,9 +577,23 @@ flush_data(Fd, {follows, Parser, Ref}, Att) -> end end, try - flush_data(Fd, Fun, store(data, Fun, Att)) + flush_data(Db, Fun, store(data, Fun, Att)) after erlang:demonitor(ParserRef, [flush]) + end; +flush_data(Db, {stream, StreamEngine}, Att) -> + case couch_db:is_active_stream(Db, StreamEngine) of + true -> + % Already written + Att; + false -> + NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) -> + couch_stream:copy(StreamEngine, OutputStream) + end), + InMd5 = fetch(md5, Att), + OutMd5 = fetch(md5, NewAtt), + couch_util:check_md5(OutMd5, InMd5), + NewAtt end. @@ -594,9 +622,9 @@ foldl(Att, Fun, Acc) -> foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -foldl({Fd, Sp}, Att, Fun, Acc) -> +foldl({stream, StreamEngine}, Att, Fun, Acc) -> Md5 = fetch(md5, Att), - couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); + couch_stream:foldl(StreamEngine, Md5, Fun, Acc); foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) -> Len = fetch(att_len, Att), fold_streamed_data(DataFun, Len, Fun, Acc); @@ -621,14 +649,15 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) -> range_foldl(Att, From, To, Fun, Acc) -> - {Fd, Sp} = fetch(data, Att), - couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc). + {stream, StreamEngine} = fetch(data, Att), + couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc). foldl_decode(Att, Fun, Acc) -> case fetch([data, encoding], Att) of - [{Fd, Sp}, Enc] -> - couch_stream:foldl_decode(Fd, Sp, fetch(md5, Att), Enc, Fun, Acc); + [{stream, StreamEngine}, Enc] -> + couch_stream:foldl_decode( + StreamEngine, fetch(md5, Att), Enc, Fun, Acc); [Fun2, identity] -> fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc) end. @@ -642,7 +671,7 @@ to_binary(Bin, _Att) when is_binary(Bin) -> Bin; to_binary(Iolist, _Att) when is_list(Iolist) -> iolist_to_binary(Iolist); -to_binary({_Fd,_Sp}, Att) -> +to_binary({stream, _StreamEngine}, Att) -> iolist_to_binary( lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, [])) ); @@ -718,9 +747,25 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) -> ok. +open_stream(StreamSrc, Data) -> + case couch_db:is_db(StreamSrc) of + true -> + couch_db:open_read_stream(StreamSrc, Data); + false -> + case is_function(StreamSrc, 1) of + true -> + StreamSrc(Data); + false -> + erlang:error({invalid_stream_source, StreamSrc}) + end + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +% Eww... +-include("couch_bt_engine.hrl"). %% Test utilities @@ -775,7 +820,7 @@ attachment_disk_term_test_() -> {disk_len, 0}, {md5, <<212,29,140,217,143,0,178,4,233,128,9,152,236,248,66,126>>}, {revpos, 4}, - {data, {fake_fd, fake_sp}}, + {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}}, {encoding, identity} ]), BaseDiskTerm = { @@ -789,11 +834,12 @@ attachment_disk_term_test_() -> Headers = [{<<"X-Foo">>, <<"bar">>}], ExtendedAttachment = store(headers, Headers, BaseAttachment), ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]}, + FakeDb = test_util:fake_db([{engine, {couch_bt_engine, #st{fd=fake_fd}}}]), {"Disk term tests", [ ?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)), - ?_assertEqual(BaseAttachment, from_disk_term(fake_fd, BaseDiskTerm)), + ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)), ?_assertEqual(ExtendedDiskTerm, to_disk_term(ExtendedAttachment)), - ?_assertEqual(ExtendedAttachment, from_disk_term(fake_fd, ExtendedDiskTerm)) + ?_assertEqual(ExtendedAttachment, from_disk_term(FakeDb, ExtendedDiskTerm)) ]}. |