diff options
author | Jan Lehnardt <jan@apache.org> | 2018-02-13 15:32:29 +0100 |
---|---|---|
committer | Jan Lehnardt <jan@apache.org> | 2018-02-13 15:32:29 +0100 |
commit | c64d8e55cb4c8116d4db1d6e7e84433bcdec24e2 (patch) | |
tree | 51a579bd3e5c4efda46225c1ec08495287951724 | |
parent | d35f00aae7f32a97988e5dac4c37de7030c74a0d (diff) | |
download | couchdb-fix/745/davisp.tar.gz |
fix: do not pile up multipart parser processes, by @davisparchive/fix/745/davispfix/745/davisp
-rw-r--r-- | src/couch/src/couch_httpd_multipart.erl | 69 |
1 files changed, 61 insertions, 8 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 6ce3c76fe..f38cef8b5 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing -> ok; {get_bytes, Ref, From} -> - C2 = orddict:update_counter(From, 1, Counters), - NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), - mp_parse_atts(eof, NewAcc); + C2 = update_writer(From, Counters), + case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of + abort_parsing -> + ok; + NewAcc -> + mp_parse_atts(eof, NewAcc) + end; {'DOWN', ParentRef, _, _, _} -> - exit(mp_reader_coordinator_died) - after 3600000 -> + exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + ok; + C2 -> + NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, + mp_parse_atts(eof, NewAcc) + end + after 300000 -> ok end end. @@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) -> maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> receive {get_bytes, Ref, From} -> - NewCounters = orddict:update_counter(From, 1, Counters), + NewCounters = update_writer(From, Counters), maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) after 0 -> % reply to as many writers as possible NewWaiting = lists:filter(fun(Writer) -> - WhichChunk = orddict:fetch(Writer, Counters), + {_, WhichChunk} = orddict:fetch(Writer, Counters), ListIndex = WhichChunk - Offset, if ListIndex =< length(Chunks) -> Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, @@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing; {'DOWN', ParentRef, _, _, _} -> exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + abort_parsing; + C2 -> + RestWaiting = NewWaiting -- [WriterPid], + NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, + maybe_send_data(NewAcc) + end; {get_bytes, Ref, X} -> - C2 = orddict:update_counter(X, 1, Counters), + C2 = update_writer(X, Counters), maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) + after 300000 -> + abort_parsing end end end. +update_writer(WriterPid, Counters) -> + UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, + InitialValue = case orddict:find(WriterPid, Counters) of + {ok, IV} -> + IV; + error -> + WriterRef = erlang:monitor(process, WriterPid), + {WriterRef, 1} + end, + orddict:update(WriterPid, UpdateFun, InitialValue, Counters). + + +remove_writer(WriterPid, WriterRef, Counters) -> + case orddict:find(WriterPid, Counters) of + {ok, {WriterRef, _}} -> + case num_mp_writers() of + N when N > 1 -> + num_mp_writers(N - 1); + _ -> + abort_parsing + end; + {ok, _} -> + % We got a different ref fired for a known worker + abort_parsing; + error -> + % Unknown worker pid? + abort_parsing + end. + + num_mp_writers(N) -> erlang:put(mp_att_writers, N). |