diff options
authorNick Vatamaniuc <>2022-04-29 17:35:19 -0400
committerNick Vatamaniuc <>2022-05-18 14:02:53 -0400
commit0bc85e9400aa78e6782c5aa2c2303876bc17d527 (patch)
parentd179196f3ee9e31d9325a404e8e6ba5fa6e24b22 (diff)
Make sure to send hello_from_writer only once for each worker
Assuming a document may have multiple attachments, or even the possibility of having multiple documents each with a different parsers, send `hello_from_writer` once per worker process as soon as possible in fabric_rpc. For additional safety (belt and suspenders) assert that the closure is called from the same process which sent hello and started monitoring the parser, and if mp_parser_died is caught, and the data function is called again, re-throw the same mp_parser_died error.
1 files changed, 32 insertions, 4 deletions
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 80a2de67c..cfda1a37e 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -637,21 +637,49 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) ->
Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0],
[Doc#doc{atts = Atts} | make_att_readers(Rest)].
-make_att_reader({follows, Parser, Ref}) ->
+make_att_reader({follows, Parser, Ref}) when is_pid(Parser) ->
% This code will fail if the returned closure is called by a
% process other than the one that called make_att_reader/1 in the
% first place. The reason we don't put everything inside the
% closure is that the `hello_from_writer` message must *always* be
- % sent to the parser, even if the closure never gets called.
- ParserRef = erlang:monitor(process, Parser),
- Parser ! {hello_from_writer, Ref, self()},
+ % sent to the parser, even if the closure never gets called. Also,
+ % make sure `hello_from_writer` is sent only once for the all the
+ % rest of the possible attachments.
+ WriterPid = self(),
+ ParserRef =
+ case get({mp_parser_ref, Parser}) of
+ undefined ->
+ % First time encountering a particular parser pid. Monitor it,
+ % in case it dies, and notify it about us, so it could monitor
+ % us in case we die.
+ PRef = erlang:monitor(process, Parser),
+ put({mp_parser_ref, Parser}, PRef),
+ Parser ! {hello_from_writer, Ref, WriterPid},
+ PRef;
+ Else ->
+ Else
+ end,
fun() ->
+ % Make sure the closure is always called from the same process which
+ % sent the hello_from_writer message.
+ case self() =:= WriterPid of
+ true -> ok;
+ false -> error({make_att_pid_assertion, self(), WriterPid})
+ end,
+ % Check if parser already died. This is for belt and suspenders mostly,
+ % in case somehow we call the data function again after mp_parser_died
+ % was thrown, so we are not stuck forever waiting for bytes.
+ case get({mp_parser_died, Parser}) of
+ undefined -> ok;
+ AlreadyDiedReason -> throw({mp_parser_died, AlreadyDiedReason})
+ end,
Parser ! {get_bytes, Ref, self()},
{bytes, Ref, Bytes} ->
{'DOWN', ParserRef, _, _, Reason} ->
+ put({mp_parser_died, Parser}, Reason),
throw({mp_parser_died, Reason})