diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2022-02-23 15:45:57 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-05-18 14:02:53 -0400 |
commit | 8601e40f624bf9b7f0c067073ca13040c8026978 (patch) | |
tree | 74b3ad1d4381538342f1cd3119e8db7e5f053595 | |
parent | 9b9dc6ccc79b327594da4ae61328fe09b06fc584 (diff) | |
download | couchdb-8601e40f624bf9b7f0c067073ca13040c8026978.tar.gz |
Ensure the parser always monitors the worker
This adds an extra `hello_from_writer` message into the handshake
between the process that reads the multipart attachment data from the
socket and the writer processes (potentially on remote nodes) that
persist the data into each shard file. This ensures that even in the
case where a writer does not end up asking for the data (e.g. because
the revision already exists in the tree), the parser will monitor the
writer and therefore know when the writer has exited.
The patch makes some assumptions that the attachment flush function is
executed in the same process as the initial one that is spawned to
handle the fabric_rpc work request. That's true today, but if it
changed in the future it would be a non-obvious breakage to debug.
-rw-r--r-- | src/couch/src/couch_httpd_multipart.erl | 22 | ||||
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 16 |
2 files changed, 20 insertions, 18 deletions
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 95a2c9e3c..b397502c0 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -104,6 +104,10 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> receive abort_parsing -> ok; + {hello_from_writer, Ref, WriterPid} -> + WriterRef = erlang:monitor(process, WriterPid), + NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters), + mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting}); {get_bytes, Ref, From} -> C2 = update_writer(From, Counters), case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of @@ -134,6 +138,10 @@ mp_abort_parse_atts(_, _) -> maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> receive + {hello_from_writer, Ref, WriterPid} -> + WriterRef = erlang:monitor(process, WriterPid), + NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters), + maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting}); {get_bytes, Ref, From} -> NewCounters = update_writer(From, Counters), maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]}) @@ -195,6 +203,10 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, maybe_send_data(NewAcc) end; + {hello_from_writer, Ref, WriterPid} -> + WriterRef = erlang:monitor(process, WriterPid), + C2 = orddict:store(WriterPid, {WriterRef, 0}, Counters), + maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting}); {get_bytes, Ref, X} -> C2 = update_writer(X, Counters), maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]}) @@ -206,15 +218,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> update_writer(WriterPid, Counters) -> UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, - InitialValue = - case orddict:find(WriterPid, Counters) of - {ok, IV} -> - IV; - error -> - WriterRef = erlang:monitor(process, WriterPid), - {WriterRef, 1} - end, - orddict:update(WriterPid, UpdateFun, InitialValue, Counters). + orddict:update(WriterPid, UpdateFun, Counters). remove_writer(WriterPid, WriterRef, Counters) -> case orddict:find(WriterPid, Counters) of diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index a90c94ade..80a2de67c 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -638,16 +638,14 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) -> [Doc#doc{atts = Atts} | make_att_readers(Rest)]. make_att_reader({follows, Parser, Ref}) -> + % This code will fail if the returned closure is called by a + % process other than the one that called make_att_reader/1 in the + % first place. The reason we don't put everything inside the + % closure is that the `hello_from_writer` message must *always* be + % sent to the parser, even if the closure never gets called. + ParserRef = erlang:monitor(process, Parser), + Parser ! {hello_from_writer, Ref, self()}, fun() -> - ParserRef = - case get(mp_parser_ref) of - undefined -> - PRef = erlang:monitor(process, Parser), - put(mp_parser_ref, PRef), - PRef; - Else -> - Else - end, Parser ! {get_bytes, Ref, self()}, receive {bytes, Ref, Bytes} -> |