summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2018-02-13 15:32:29 +0100
committerJan Lehnardt <jan@apache.org>2018-02-13 15:32:29 +0100
commitc64d8e55cb4c8116d4db1d6e7e84433bcdec24e2 (patch)
tree51a579bd3e5c4efda46225c1ec08495287951724
parentd35f00aae7f32a97988e5dac4c37de7030c74a0d (diff)
downloadcouchdb-fix/745/davisp.tar.gz
fix: do not pile up multipart parser processes, by @davisparchive/fix/745/davispfix/745/davisp
-rw-r--r--src/couch/src/couch_httpd_multipart.erl69
1 files changed, 61 insertions, 8 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index 6ce3c76fe..f38cef8b5 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing ->
ok;
{get_bytes, Ref, From} ->
- C2 = orddict:update_counter(From, 1, Counters),
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
- mp_parse_atts(eof, NewAcc);
+ C2 = update_writer(From, Counters),
+ case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of
+ abort_parsing ->
+ ok;
+ NewAcc ->
+ mp_parse_atts(eof, NewAcc)
+ end;
{'DOWN', ParentRef, _, _, _} ->
- exit(mp_reader_coordinator_died)
- after 3600000 ->
+ exit(mp_reader_coordinator_died);
+ {'DOWN', WriterRef, _, WriterPid, _} ->
+ case remove_writer(WriterPid, WriterRef, Counters) of
+ abort_parsing ->
+ ok;
+ C2 ->
+ NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]},
+ mp_parse_atts(eof, NewAcc)
+ end
+ after 300000 ->
ok
end
end.
@@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) ->
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive {get_bytes, Ref, From} ->
- NewCounters = orddict:update_counter(From, 1, Counters),
+ NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
after 0 ->
% reply to as many writers as possible
NewWaiting = lists:filter(fun(Writer) ->
- WhichChunk = orddict:fetch(Writer, Counters),
+ {_, WhichChunk} = orddict:fetch(Writer, Counters),
ListIndex = WhichChunk - Offset,
if ListIndex =< length(Chunks) ->
Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
@@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing;
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died);
+ {'DOWN', WriterRef, _, WriterPid, _} ->
+ case remove_writer(WriterPid, WriterRef, Counters) of
+ abort_parsing ->
+ abort_parsing;
+ C2 ->
+ RestWaiting = NewWaiting -- [WriterPid],
+ NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
+ maybe_send_data(NewAcc)
+ end;
{get_bytes, Ref, X} ->
- C2 = orddict:update_counter(X, 1, Counters),
+ C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
+ after 300000 ->
+ abort_parsing
end
end
end.
+update_writer(WriterPid, Counters) ->
+ UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
+ InitialValue = case orddict:find(WriterPid, Counters) of
+ {ok, IV} ->
+ IV;
+ error ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ {WriterRef, 1}
+ end,
+ orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+
+
+remove_writer(WriterPid, WriterRef, Counters) ->
+ case orddict:find(WriterPid, Counters) of
+ {ok, {WriterRef, _}} ->
+ case num_mp_writers() of
+ N when N > 1 ->
+ num_mp_writers(N - 1);
+ _ ->
+ abort_parsing
+ end;
+ {ok, _} ->
+ % We got a different ref fired for a known worker
+ abort_parsing;
+ error ->
+ % Unknown worker pid?
+ abort_parsing
+ end.
+
+
num_mp_writers(N) ->
erlang:put(mp_att_writers, N).