summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2013-04-02 18:05:30 -0500
committerAdam Kocoloski <adam@cloudant.com>2013-10-02 11:59:31 -0400
commit8d6c59d55b097d8db23adef5077df4871e25c34d (patch)
treede0793f1eae20df11863892beede302eb5db470f
parentfa982654b10a8d79ea934c5d9b5cb91ff7998e45 (diff)
downloadcouchdb-8d6c59d55b097d8db23adef5077df4871e25c34d.tar.gz
Fix replication deadlock after HTTP retries
An HTTP failure in the open_doc_revs can lead to a replication deadlock if the retry happens while parsing the multipart/mime response. The UserFun callback can end up attempting to PUT a request using an attachment reader from the failed request while the pid that invoked open_doc_revs will never see the started_open_doc_revs message which indicates a retry has happened. This just spawns a middleman process so that the process that invoked open_doc_revs can listen for the response from the user function as well as the started_open_doc_revs message to handle retries appropriately.
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl36
1 files changed, 33 insertions, 3 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 4aabe15a1..99215609a 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -535,7 +535,7 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
fun() -> receive_doc_data(Streamer, Ref) end,
Ref) of
{ok, Doc, Parser} ->
- case UserFun({ok, Doc}, UserAcc) of
+ case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of
{ok, UserAcc2} ->
ok;
{skip, UserAcc2} ->
@@ -546,13 +546,13 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
{"application/json", []} ->
Doc = couch_doc:from_json_obj(
?JSON_DECODE(receive_all(Streamer, Ref, []))),
- {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
+ {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref),
receive_docs(Streamer, UserFun, Ref, UserAcc2);
{"application/json", [{"error","true"}]} ->
{ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
Rev = get_value(<<"missing">>, ErrorProps),
Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
- {_, UserAcc2} = UserFun(Result, UserAcc),
+ {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref),
receive_docs(Streamer, UserFun, Ref, UserAcc2)
end;
{done, Ref} ->
@@ -560,6 +560,36 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
end.
+run_user_fun(UserFun, Arg, UserAcc, OldRef) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ try UserFun(Arg, UserAcc) of
+ Resp ->
+ exit({exit_ok, Resp})
+ catch
+ throw:Reason ->
+ exit({exit_throw, Reason});
+ error:Reason ->
+ exit({exit_error, Reason});
+ exit:Reason ->
+ exit({exit_exit, Reason})
+ end
+ end),
+ receive
+ {started_open_doc_revs, NewRef} ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ restart_remote_open_doc_revs(OldRef, NewRef);
+ {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+ Ret;
+ {'DOWN', Ref, process, Pid, {exit_throw, Reason}} ->
+ throw(Reason);
+ {'DOWN', Ref, process, Pid, {exit_error, Reason}} ->
+ erlang:error(Reason);
+ {'DOWN', Ref, process, Pid, {exit_exit, Reason}} ->
+ erlang:exit(Reason)
+ end.
+
+
restart_remote_open_doc_revs(Ref, NewRef) ->
receive
{body_bytes, Ref, _} ->