diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2013-04-02 18:05:30 -0500 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2013-10-02 11:59:31 -0400 |
commit | 8d6c59d55b097d8db23adef5077df4871e25c34d (patch) | |
tree | de0793f1eae20df11863892beede302eb5db470f | |
parent | fa982654b10a8d79ea934c5d9b5cb91ff7998e45 (diff) | |
download | couchdb-8d6c59d55b097d8db23adef5077df4871e25c34d.tar.gz |
Fix replication deadlock after HTTP retries
An HTTP failure in the open_doc_revs can lead to a replication deadlock
if the retry happens while parsing the multipart/mime response. The
UserFun callback can end up attempting to PUT a request using an
attachment reader from the failed request while the pid that invoked
open_doc_revs will never see the started_open_doc_revs message which
indicates a retry has happened.
This just spawns a middleman process so that the process that invoked
open_doc_revs can listen for the response from the user function as well
as the started_open_doc_revs message to handle retries appropriately.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_api_wrap.erl | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index 4aabe15a1..99215609a 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -535,7 +535,7 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> fun() -> receive_doc_data(Streamer, Ref) end, Ref) of {ok, Doc, Parser} -> - case UserFun({ok, Doc}, UserAcc) of + case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of {ok, UserAcc2} -> ok; {skip, UserAcc2} -> @@ -546,13 +546,13 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> {"application/json", []} -> Doc = couch_doc:from_json_obj( ?JSON_DECODE(receive_all(Streamer, Ref, []))), - {_, UserAcc2} = UserFun({ok, Doc}, UserAcc), + {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref), receive_docs(Streamer, UserFun, Ref, UserAcc2); {"application/json", [{"error","true"}]} -> {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])), Rev = get_value(<<"missing">>, ErrorProps), Result = {{not_found, missing}, couch_doc:parse_rev(Rev)}, - {_, UserAcc2} = UserFun(Result, UserAcc), + {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref), receive_docs(Streamer, UserFun, Ref, UserAcc2) end; {done, Ref} -> @@ -560,6 +560,36 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> end. +run_user_fun(UserFun, Arg, UserAcc, OldRef) -> + {Pid, Ref} = spawn_monitor(fun() -> + try UserFun(Arg, UserAcc) of + Resp -> + exit({exit_ok, Resp}) + catch + throw:Reason -> + exit({exit_throw, Reason}); + error:Reason -> + exit({exit_error, Reason}); + exit:Reason -> + exit({exit_exit, Reason}) + end + end), + receive + {started_open_doc_revs, NewRef} -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + restart_remote_open_doc_revs(OldRef, NewRef); + {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> + Ret; + {'DOWN', Ref, process, Pid, {exit_throw, Reason}} -> + throw(Reason); + {'DOWN', Ref, process, Pid, {exit_error, Reason}} -> + erlang:error(Reason); + {'DOWN', Ref, process, Pid, {exit_exit, Reason}} -> + erlang:exit(Reason) + end. + + restart_remote_open_doc_revs(Ref, NewRef) -> receive {body_bytes, Ref, _} -> |