summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2018-02-13 15:32:29 +0100
committerPaul J. Davis <paul.joseph.davis@gmail.com>2018-02-22 15:32:22 -0600
commitce5dd3fb74a497b790e3149362fea0cba6864055 (patch)
tree760955ad96a00ef3697eabdf5a61e94f43662634
parent6d959a7acab7d0ff110c7799f3295b6193f45c2f (diff)
downloadcouchdb-745-fix-zombie-multipart-writers.tar.gz
Prevent chttpd multipart zombie processes745-fix-zombie-multipart-writers
Occasionally it's possible to lose track of our RPC workers in the main multipart parsing code. This change monitors each worker process and then exits if all workers have exited before the parser considers itself finished. Fixes part of #745
-rw-r--r--src/couch/src/couch_httpd_multipart.erl77
1 files changed, 65 insertions, 12 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index 6ce3c76fe..e556b283f 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing ->
ok;
{get_bytes, Ref, From} ->
- C2 = orddict:update_counter(From, 1, Counters),
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
- mp_parse_atts(eof, NewAcc);
+ C2 = update_writer(From, Counters),
+ case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of
+ abort_parsing ->
+ ok;
+ NewAcc ->
+ mp_parse_atts(eof, NewAcc)
+ end;
{'DOWN', ParentRef, _, _, _} ->
- exit(mp_reader_coordinator_died)
- after 3600000 ->
+ exit(mp_reader_coordinator_died);
+ {'DOWN', WriterRef, _, WriterPid, _} ->
+ case remove_writer(WriterPid, WriterRef, Counters) of
+ abort_parsing ->
+ ok;
+ C2 ->
+ NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]},
+ mp_parse_atts(eof, NewAcc)
+ end
+ after 300000 ->
ok
end
end.
@@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) ->
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive {get_bytes, Ref, From} ->
- NewCounters = orddict:update_counter(From, 1, Counters),
+ NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
after 0 ->
% reply to as many writers as possible
NewWaiting = lists:filter(fun(Writer) ->
- WhichChunk = orddict:fetch(Writer, Counters),
+ {_, WhichChunk} = orddict:fetch(Writer, Counters),
ListIndex = WhichChunk - Offset,
if ListIndex =< length(Chunks) ->
Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
@@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end, Waiting),
% check if we can drop a chunk from the head of the list
- case Counters of
+ SmallestIndex = case Counters of
[] ->
- SmallestIndex = 0;
+ 0;
_ ->
- SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
+ lists:min([C || {_WPid, {_WRef, C}} <- Counters])
end,
Size = length(Counters),
N = num_mp_writers(),
@@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end,
% we should wait for a writer if no one has written the last chunk
- LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
+ LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]),
if LargestIndex >= (Offset + length(Chunks)) ->
% someone has written all possible chunks, keep moving
{Ref, NewChunks, NewOffset, Counters, NewWaiting};
@@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing;
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died);
+ {'DOWN', WriterRef, _, WriterPid, _} ->
+ case remove_writer(WriterPid, WriterRef, Counters) of
+ abort_parsing ->
+ abort_parsing;
+ C2 ->
+ RestWaiting = NewWaiting -- [WriterPid],
+ NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
+ maybe_send_data(NewAcc)
+ end;
{get_bytes, Ref, X} ->
- C2 = orddict:update_counter(X, 1, Counters),
+ C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
+ after 300000 ->
+ abort_parsing
end
end
end.
+update_writer(WriterPid, Counters) ->
+ UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
+ InitialValue = case orddict:find(WriterPid, Counters) of
+ {ok, IV} ->
+ IV;
+ error ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ {WriterRef, 1}
+ end,
+ orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+
+
+remove_writer(WriterPid, WriterRef, Counters) ->
+ case orddict:find(WriterPid, Counters) of
+ {ok, {WriterRef, _}} ->
+ case num_mp_writers() of
+ N when N > 1 ->
+ num_mp_writers(N - 1);
+ _ ->
+ abort_parsing
+ end;
+ {ok, _} ->
+ % We got a different ref fired for a known worker
+ abort_parsing;
+ error ->
+ % Unknown worker pid?
+ abort_parsing
+ end.
+
+
num_mp_writers(N) ->
erlang:put(mp_att_writers, N).