diff options
author | Jan Lehnardt <jan@apache.org> | 2018-02-13 15:32:29 +0100 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-02-23 10:48:10 -0600 |
commit | 083239353e919e897b97e8a96ee07cb42ca4eccd (patch) | |
tree | 760955ad96a00ef3697eabdf5a61e94f43662634 | |
parent | 6d959a7acab7d0ff110c7799f3295b6193f45c2f (diff) | |
download | couchdb-083239353e919e897b97e8a96ee07cb42ca4eccd.tar.gz |
Prevent chttpd multipart zombie processes
Occasionally it's possible to lose track of our RPC workers in the main
multipart parsing code. This change monitors each worker process and
then exits if all workers have exited before the parser considers itself
finished.
Fixes part of #745
-rw-r--r-- | src/couch/src/couch_httpd_multipart.erl | 77 |
1 files changed, 65 insertions, 12 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 6ce3c76fe..e556b283f 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing -> ok; {get_bytes, Ref, From} -> - C2 = orddict:update_counter(From, 1, Counters), - NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), - mp_parse_atts(eof, NewAcc); + C2 = update_writer(From, Counters), + case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of + abort_parsing -> + ok; + NewAcc -> + mp_parse_atts(eof, NewAcc) + end; {'DOWN', ParentRef, _, _, _} -> - exit(mp_reader_coordinator_died) - after 3600000 -> + exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + ok; + C2 -> + NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, + mp_parse_atts(eof, NewAcc) + end + after 300000 -> ok end end. @@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) -> maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> receive {get_bytes, Ref, From} -> - NewCounters = orddict:update_counter(From, 1, Counters), + NewCounters = update_writer(From, Counters), maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) after 0 -> % reply to as many writers as possible NewWaiting = lists:filter(fun(Writer) -> - WhichChunk = orddict:fetch(Writer, Counters), + {_, WhichChunk} = orddict:fetch(Writer, Counters), ListIndex = WhichChunk - Offset, if ListIndex =< length(Chunks) -> Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, @@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end, Waiting), % check if we can drop a chunk from the head of the list - case Counters of + SmallestIndex = case Counters of [] -> - SmallestIndex = 0; + 0; _ -> - SmallestIndex = lists:min(element(2, lists:unzip(Counters))) + lists:min([C || {_WPid, {_WRef, C}} <- Counters]) end, Size = length(Counters), N = num_mp_writers(), @@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end, % we should wait for a writer if no one has written the last chunk - LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]), + LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]), if LargestIndex >= (Offset + length(Chunks)) -> % someone has written all possible chunks, keep moving {Ref, NewChunks, NewOffset, Counters, NewWaiting}; @@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing; {'DOWN', ParentRef, _, _, _} -> exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + abort_parsing; + C2 -> + RestWaiting = NewWaiting -- [WriterPid], + NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, + maybe_send_data(NewAcc) + end; {get_bytes, Ref, X} -> - C2 = orddict:update_counter(X, 1, Counters), + C2 = update_writer(X, Counters), maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) + after 300000 -> + abort_parsing end end end. +update_writer(WriterPid, Counters) -> + UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, + InitialValue = case orddict:find(WriterPid, Counters) of + {ok, IV} -> + IV; + error -> + WriterRef = erlang:monitor(process, WriterPid), + {WriterRef, 1} + end, + orddict:update(WriterPid, UpdateFun, InitialValue, Counters). + + +remove_writer(WriterPid, WriterRef, Counters) -> + case orddict:find(WriterPid, Counters) of + {ok, {WriterRef, _}} -> + case num_mp_writers() of + N when N > 1 -> + num_mp_writers(N - 1); + _ -> + abort_parsing + end; + {ok, _} -> + % We got a different ref fired for a known worker + abort_parsing; + error -> + % Unknown worker pid? + abort_parsing + end. + + num_mp_writers(N) -> erlang:put(mp_att_writers, N). |