summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2022-02-23 15:45:57 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-05-18 14:02:53 -0400
commit8601e40f624bf9b7f0c067073ca13040c8026978 (patch)
tree74b3ad1d4381538342f1cd3119e8db7e5f053595
parent9b9dc6ccc79b327594da4ae61328fe09b06fc584 (diff)
downloadcouchdb-8601e40f624bf9b7f0c067073ca13040c8026978.tar.gz
Ensure the parser always monitors the worker
This adds an extra `hello_from_writer` message into the handshake between the process that reads the multipart attachment data from the socket and the writer processes (potentially on remote nodes) that persist the data into each shard file. This ensures that even in the case where a writer does not end up asking for the data (e.g. because the revision already exists in the tree), the parser will monitor the writer and therefore know when the writer has exited. The patch makes some assumptions that the attachment flush function is executed in the same process as the initial one that is spawned to handle the fabric_rpc work request. That's true today, but if it changed in the future it would be a non-obvious breakage to debug.
-rw-r--r--src/couch/src/couch_httpd_multipart.erl22
-rw-r--r--src/fabric/src/fabric_rpc.erl16
2 files changed, 20 insertions, 18 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index 95a2c9e3c..b397502c0 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -104,6 +104,10 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
receive
abort_parsing ->
ok;
+ {hello_from_writer, Ref, WriterPid} ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+ mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting});
{get_bytes, Ref, From} ->
C2 = update_writer(From, Counters),
case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of
@@ -134,6 +138,10 @@ mp_abort_parse_atts(_, _) ->
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive
+ {hello_from_writer, Ref, WriterPid} ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+ maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting});
{get_bytes, Ref, From} ->
NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]})
@@ -195,6 +203,10 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
maybe_send_data(NewAcc)
end;
+ {hello_from_writer, Ref, WriterPid} ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ C2 = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+ maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting});
{get_bytes, Ref, X} ->
C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]})
@@ -206,15 +218,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
update_writer(WriterPid, Counters) ->
UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
- InitialValue =
- case orddict:find(WriterPid, Counters) of
- {ok, IV} ->
- IV;
- error ->
- WriterRef = erlang:monitor(process, WriterPid),
- {WriterRef, 1}
- end,
- orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+ orddict:update(WriterPid, UpdateFun, Counters).
remove_writer(WriterPid, WriterRef, Counters) ->
case orddict:find(WriterPid, Counters) of
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index a90c94ade..80a2de67c 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -638,16 +638,14 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) ->
[Doc#doc{atts = Atts} | make_att_readers(Rest)].
make_att_reader({follows, Parser, Ref}) ->
+ % This code will fail if the returned closure is called by a
+ % process other than the one that called make_att_reader/1 in the
+ % first place. The reason we don't put everything inside the
+ % closure is that the `hello_from_writer` message must *always* be
+ % sent to the parser, even if the closure never gets called.
+ ParserRef = erlang:monitor(process, Parser),
+ Parser ! {hello_from_writer, Ref, self()},
fun() ->
- ParserRef =
- case get(mp_parser_ref) of
- undefined ->
- PRef = erlang:monitor(process, Parser),
- put(mp_parser_ref, PRef),
- PRef;
- Else ->
- Else
- end,
Parser ! {get_bytes, Ref, self()},
receive
{bytes, Ref, Bytes} ->