diff options
-rw-r--r-- | src/couch/src/couch_httpd_multipart.erl | 77 |
1 files changed, 65 insertions, 12 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 6ce3c76fe..e556b283f 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing -> ok; {get_bytes, Ref, From} -> - C2 = orddict:update_counter(From, 1, Counters), - NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), - mp_parse_atts(eof, NewAcc); + C2 = update_writer(From, Counters), + case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of + abort_parsing -> + ok; + NewAcc -> + mp_parse_atts(eof, NewAcc) + end; {'DOWN', ParentRef, _, _, _} -> - exit(mp_reader_coordinator_died) - after 3600000 -> + exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + ok; + C2 -> + NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, + mp_parse_atts(eof, NewAcc) + end + after 300000 -> ok end end. @@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) -> maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> receive {get_bytes, Ref, From} -> - NewCounters = orddict:update_counter(From, 1, Counters), + NewCounters = update_writer(From, Counters), maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) after 0 -> % reply to as many writers as possible NewWaiting = lists:filter(fun(Writer) -> - WhichChunk = orddict:fetch(Writer, Counters), + {_, WhichChunk} = orddict:fetch(Writer, Counters), ListIndex = WhichChunk - Offset, if ListIndex =< length(Chunks) -> Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, @@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end, Waiting), % check if we can drop a chunk from the head of the list - case Counters of + SmallestIndex = case Counters of [] -> - SmallestIndex = 0; + 0; _ -> - SmallestIndex = lists:min(element(2, lists:unzip(Counters))) + lists:min([C || {_WPid, {_WRef, C}} <- Counters]) end, Size = length(Counters), N = num_mp_writers(), @@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end, % we should wait for a writer if no one has written the last chunk - LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]), + LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]), if LargestIndex >= (Offset + length(Chunks)) -> % someone has written all possible chunks, keep moving {Ref, NewChunks, NewOffset, Counters, NewWaiting}; @@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing; {'DOWN', ParentRef, _, _, _} -> exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + abort_parsing; + C2 -> + RestWaiting = NewWaiting -- [WriterPid], + NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, + maybe_send_data(NewAcc) + end; {get_bytes, Ref, X} -> - C2 = orddict:update_counter(X, 1, Counters), + C2 = update_writer(X, Counters), maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) + after 300000 -> + abort_parsing end end end. +update_writer(WriterPid, Counters) -> + UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, + InitialValue = case orddict:find(WriterPid, Counters) of + {ok, IV} -> + IV; + error -> + WriterRef = erlang:monitor(process, WriterPid), + {WriterRef, 1} + end, + orddict:update(WriterPid, UpdateFun, InitialValue, Counters). + + +remove_writer(WriterPid, WriterRef, Counters) -> + case orddict:find(WriterPid, Counters) of + {ok, {WriterRef, _}} -> + case num_mp_writers() of + N when N > 1 -> + num_mp_writers(N - 1); + _ -> + abort_parsing + end; + {ok, _} -> + % We got a different ref fired for a known worker + abort_parsing; + error -> + % Unknown worker pid? + abort_parsing + end. + + num_mp_writers(N) -> erlang:put(mp_att_writers, N). |