diff options
Diffstat (limited to 'src/couch/src/couch_httpd_multipart.erl')
-rw-r--r-- | src/couch/src/couch_httpd_multipart.erl | 359 |
1 files changed, 0 insertions, 359 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl deleted file mode 100644 index 11ee6790e..000000000 --- a/src/couch/src/couch_httpd_multipart.erl +++ /dev/null @@ -1,359 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_httpd_multipart). - --export([ - abort_multipart_stream/1, - decode_multipart_stream/3, - encode_multipart_stream/5, - length_multipart_stream/3, - num_mp_writers/0, - num_mp_writers/1 -]). - --include_lib("couch/include/couch_db.hrl"). - -decode_multipart_stream(ContentType, DataFun, Ref) -> - Parent = self(), - NumMpWriters = num_mp_writers(), - {Parser, ParserRef} = spawn_monitor(fun() -> - ParentRef = erlang:monitor(process, Parent), - put(mp_parent_ref, ParentRef), - num_mp_writers(NumMpWriters), - {<<"--", _/binary>>, _, _} = couch_httpd:parse_multipart_request( - ContentType, - DataFun, - fun(Next) -> mp_parse_doc(Next, []) end - ), - unlink(Parent) - end), - Parser ! {get_doc_bytes, Ref, self()}, - receive - {started_open_doc_revs, NewRef} -> - %% FIXME: How to remove the knowledge about this message? - {{started_open_doc_revs, NewRef}, Parser, ParserRef}; - {doc_bytes, Ref, DocBytes} -> - {{doc_bytes, Ref, DocBytes}, Parser, ParserRef}; - {'DOWN', ParserRef, _, _, normal} -> - ok; - {'DOWN', ParserRef, process, Parser, {{nocatch, {Error, Msg}}, _}} -> - couch_log:error( - "Multipart streamer ~p died with reason ~p", - [ParserRef, Msg] - ), - throw({Error, Msg}); - {'DOWN', ParserRef, _, _, Reason} -> - couch_log:error( - "Multipart streamer ~p died with reason ~p", - [ParserRef, Reason] - ), - throw({error, Reason}) - end. - -mp_parse_doc({headers, H}, []) -> - case couch_util:get_value("content-type", H) of - {"application/json", _} -> - fun(Next) -> - mp_parse_doc(Next, []) - end; - _ -> - throw({bad_ctype, <<"Content-Type must be application/json">>}) - end; -mp_parse_doc({body, Bytes}, AccBytes) -> - fun(Next) -> - mp_parse_doc(Next, [Bytes | AccBytes]) - end; -mp_parse_doc(body_end, AccBytes) -> - receive - {get_doc_bytes, Ref, From} -> - From ! {doc_bytes, Ref, lists:reverse(AccBytes)} - end, - fun(Next) -> - mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []}) - end. - -mp_parse_atts({headers, _}, Acc) -> - fun(Next) -> mp_parse_atts(Next, Acc) end; -mp_parse_atts(body_end, Acc) -> - fun(Next) -> mp_parse_atts(Next, Acc) end; -mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) -> - case maybe_send_data({Ref, Chunks ++ [Bytes], Offset, Counters, Waiting}) of - abort_parsing -> - fun(Next) -> mp_abort_parse_atts(Next, nil) end; - NewAcc -> - fun(Next) -> mp_parse_atts(Next, NewAcc) end - end; -mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> - N = num_mp_writers(), - M = length(Counters), - case (M == N) andalso Chunks == [] of - true -> - ok; - false -> - ParentRef = get(mp_parent_ref), - receive - abort_parsing -> - ok; - {hello_from_writer, Ref, WriterPid} -> - NewCounters = handle_hello(WriterPid, Counters), - mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting}); - {get_bytes, Ref, From} -> - C2 = update_writer(From, Counters), - case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of - abort_parsing -> - ok; - NewAcc -> - mp_parse_atts(eof, NewAcc) - end; - {'DOWN', ParentRef, _, _, _} -> - exit(mp_reader_coordinator_died); - {'DOWN', WriterRef, _, WriterPid, _} -> - case remove_writer(WriterPid, WriterRef, Counters) of - abort_parsing -> - ok; - C2 -> - NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, - mp_parse_atts(eof, NewAcc) - end - after att_writer_timeout() -> - ok - end - end. - -mp_abort_parse_atts(eof, _) -> - ok; -mp_abort_parse_atts(_, _) -> - fun(Next) -> mp_abort_parse_atts(Next, nil) end. - -maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> - receive - {hello_from_writer, Ref, WriterPid} -> - NewCounters = handle_hello(WriterPid, Counters), - maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting}); - {get_bytes, Ref, From} -> - NewCounters = update_writer(From, Counters), - maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]}) - after 0 -> - % reply to as many writers as possible - NewWaiting = lists:filter( - fun(Writer) -> - {_, WhichChunk} = orddict:fetch(Writer, Counters), - ListIndex = WhichChunk - Offset, - if - ListIndex =< length(Chunks) -> - Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, - false; - true -> - true - end - end, - Waiting - ), - - % check if we can drop a chunk from the head of the list - SmallestIndex = - case Counters of - [] -> - 0; - _ -> - lists:min([C || {_WPid, {_WRef, C}} <- Counters]) - end, - Size = length(Counters), - N = num_mp_writers(), - if - Size == N andalso SmallestIndex == (Offset + 1) -> - NewChunks = tl(Chunks), - NewOffset = Offset + 1; - true -> - NewChunks = Chunks, - NewOffset = Offset - end, - - % we should wait for a writer if no one has written the last chunk - LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]), - if - LargestIndex >= (Offset + length(Chunks)) -> - % someone has written all possible chunks, keep moving - {Ref, NewChunks, NewOffset, Counters, NewWaiting}; - true -> - ParentRef = get(mp_parent_ref), - receive - abort_parsing -> - abort_parsing; - {'DOWN', ParentRef, _, _, _} -> - exit(mp_reader_coordinator_died); - {'DOWN', WriterRef, _, WriterPid, _} -> - case remove_writer(WriterPid, WriterRef, Counters) of - abort_parsing -> - abort_parsing; - C2 -> - RestWaiting = NewWaiting -- [WriterPid], - NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, - maybe_send_data(NewAcc) - end; - {hello_from_writer, Ref, WriterPid} -> - C2 = handle_hello(WriterPid, Counters), - maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting}); - {get_bytes, Ref, X} -> - C2 = update_writer(X, Counters), - maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]}) - after att_writer_timeout() -> - abort_parsing - end - end - end. - -handle_hello(WriterPid, Counters) -> - WriterRef = erlang:monitor(process, WriterPid), - orddict:store(WriterPid, {WriterRef, 0}, Counters). - -update_writer(WriterPid, Counters) -> - case orddict:find(WriterPid, Counters) of - {ok, {WriterRef, Count}} -> - orddict:store(WriterPid, {WriterRef, Count + 1}, Counters); - error -> - WriterRef = erlang:monitor(process, WriterPid), - orddict:store(WriterPid, {WriterRef, 1}, Counters) - end. - -remove_writer(WriterPid, WriterRef, Counters) -> - case orddict:find(WriterPid, Counters) of - {ok, {WriterRef, _}} -> - case num_mp_writers() of - N when N > 1 -> - num_mp_writers(N - 1), - orddict:erase(WriterPid, Counters); - _ -> - abort_parsing - end; - {ok, _} -> - % We got a different ref fired for a known worker - abort_parsing; - error -> - % Unknown worker pid? - abort_parsing - end. - -num_mp_writers(N) -> - erlang:put(mp_att_writers, N). - -num_mp_writers() -> - case erlang:get(mp_att_writers) of - undefined -> 1; - Count -> Count - end. - -att_writer_timeout() -> - config:get_integer("couchdb", "attachment_writer_timeout", 300000). - -encode_multipart_stream(_Boundary, JsonBytes, [], WriteFun, _AttFun) -> - WriteFun(JsonBytes); -encode_multipart_stream(Boundary, JsonBytes, Atts, WriteFun, AttFun) -> - WriteFun([ - <<"--", Boundary/binary, "\r\nContent-Type: application/json\r\n\r\n">>, - JsonBytes, - <<"\r\n--", Boundary/binary>> - ]), - atts_to_mp(Atts, Boundary, WriteFun, AttFun). - -atts_to_mp([], _Boundary, WriteFun, _AttFun) -> - WriteFun(<<"--">>); -atts_to_mp( - [{Att, Name, Len, Type, Encoding} | RestAtts], - Boundary, - WriteFun, - AttFun -) -> - LengthBin = list_to_binary(integer_to_list(Len)), - % write headers - WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>), - WriteFun(<<"\r\nContent-Type: ", Type/binary>>), - WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>), - case Encoding of - identity -> - ok; - _ -> - EncodingBin = atom_to_binary(Encoding, latin1), - WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>) - end, - - % write data - WriteFun(<<"\r\n\r\n">>), - AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok), - WriteFun(<<"\r\n--", Boundary/binary>>), - atts_to_mp(RestAtts, Boundary, WriteFun, AttFun). - -length_multipart_stream(Boundary, JsonBytes, Atts) -> - AttsSize = lists:foldl( - fun({_Att, Name, Len, Type, Encoding}, AccAttsSize) -> - AccAttsSize + - % "\r\n\r\n" - 4 + - length(integer_to_list(Len)) + - Len + - % "\r\n--" - 4 + - size(Boundary) + - % attachment headers - % (the length of the Content-Length has already been set) - size(Name) + - size(Type) + - length("\r\nContent-Disposition: attachment; filename=\"\"") + - length("\r\nContent-Type: ") + - length("\r\nContent-Length: ") + - case Encoding of - identity -> - 0; - _ -> - length(atom_to_list(Encoding)) + - length("\r\nContent-Encoding: ") - end - end, - 0, - Atts - ), - if - AttsSize == 0 -> - {<<"application/json">>, iolist_size(JsonBytes)}; - true -> - { - <<"multipart/related; boundary=\"", Boundary/binary, "\"">>, - % "--" - 2 + - size(Boundary) + - % "\r\ncontent-type: application/json\r\n\r\n" - 36 + - iolist_size(JsonBytes) + - % "\r\n--" - 4 + - size(Boundary) + - +AttsSize + - % "--" - 2 - } - end. - -abort_multipart_stream(Parser) -> - MonRef = erlang:monitor(process, Parser), - Parser ! abort_parsing, - receive - {'DOWN', MonRef, _, _, _} -> ok - after 60000 -> - % One minute is quite on purpose for this timeout. We - % want to try and read data to keep the socket open - % when possible but we also don't want to just make - % this a super long timeout because people have to - % wait this long to see if they just had an error - % like a validate_doc_update failure. - throw(multi_part_abort_timeout) - end. |