diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-04-29 17:35:19 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-05-18 14:02:53 -0400 |
commit | 0bc85e9400aa78e6782c5aa2c2303876bc17d527 (patch) | |
tree | f3c4d374733c5e43df9c187ed886e8d3b15378c6 | |
parent | d179196f3ee9e31d9325a404e8e6ba5fa6e24b22 (diff) | |
download | couchdb-0bc85e9400aa78e6782c5aa2c2303876bc17d527.tar.gz |
Make sure to send hello_from_writer only once for each worker
Assuming a document may have multiple attachments, or even the possibility of
having multiple documents each with a different parsers, send
`hello_from_writer` once per worker process as soon as possible in fabric_rpc.
For additional safety (belt and suspenders) assert that the closure is called
from the same process which sent hello and started monitoring the parser, and
if mp_parser_died is caught, and the data function is called again, re-throw
the same mp_parser_died error.
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 80a2de67c..cfda1a37e 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -637,21 +637,49 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) -> Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0], [Doc#doc{atts = Atts} | make_att_readers(Rest)]. -make_att_reader({follows, Parser, Ref}) -> +make_att_reader({follows, Parser, Ref}) when is_pid(Parser) -> % This code will fail if the returned closure is called by a % process other than the one that called make_att_reader/1 in the % first place. The reason we don't put everything inside the % closure is that the `hello_from_writer` message must *always* be - % sent to the parser, even if the closure never gets called. - ParserRef = erlang:monitor(process, Parser), - Parser ! {hello_from_writer, Ref, self()}, + % sent to the parser, even if the closure never gets called. Also, + % make sure `hello_from_writer` is sent only once for the all the + % rest of the possible attachments. + WriterPid = self(), + ParserRef = + case get({mp_parser_ref, Parser}) of + undefined -> + % First time encountering a particular parser pid. Monitor it, + % in case it dies, and notify it about us, so it could monitor + % us in case we die. + PRef = erlang:monitor(process, Parser), + put({mp_parser_ref, Parser}, PRef), + Parser ! {hello_from_writer, Ref, WriterPid}, + PRef; + Else -> + Else + end, fun() -> + % Make sure the closure is always called from the same process which + % sent the hello_from_writer message. + case self() =:= WriterPid of + true -> ok; + false -> error({make_att_pid_assertion, self(), WriterPid}) + end, + % Check if parser already died. This is for belt and suspenders mostly, + % in case somehow we call the data function again after mp_parser_died + % was thrown, so we are not stuck forever waiting for bytes. + case get({mp_parser_died, Parser}) of + undefined -> ok; + AlreadyDiedReason -> throw({mp_parser_died, AlreadyDiedReason}) + end, Parser ! {get_bytes, Ref, self()}, receive {bytes, Ref, Bytes} -> rexi:reply(attachment_chunk_received), Bytes; {'DOWN', ParserRef, _, _, Reason} -> + put({mp_parser_died, Parser}, Reason), throw({mp_parser_died, Reason}) end end; |