summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2013-09-10 22:26:14 -0400
committerAdam Kocoloski <adam@cloudant.com>2013-10-02 12:11:24 -0400
commita3a2a5f6082f2bda9185642a344fd28af9acb348 (patch)
tree8016de3534af4c425c8f3798eca7a371435ddbb2
parent8d6c59d55b097d8db23adef5077df4871e25c34d (diff)
downloadcouchdb-a3a2a5f6082f2bda9185642a344fd28af9acb348.tar.gz
Handle open_revs retries at a higher level
This patch disables the httpc client retries for the request to stream document revisions to the replicator. The retry logic at that level could end up jumbling together response body data from different requests and thoroughly confusing the multipart parser. Moving the retry logic up a level allows us to start fresh each time. BugzID: 21367
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl98
1 files changed, 67 insertions, 31 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 99215609a..28f96c079 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -48,6 +48,7 @@
get_value/3
]).
+-define(MAX_WAIT, 5 * 60 * 1000).
db_uri(#httpdb{url = Url}) ->
couch_util:url_strip_password(Url);
@@ -157,43 +158,78 @@ get_missing_revs(Db, IdRevs) ->
+open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
+ Path = encode_doc_id(Id),
+ QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+ Url = couch_util:url_strip_password(
+ couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+ ),
+ ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]),
+ exit(kaboom);
open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
Path = encode_doc_id(Id),
- QArgs = options_to_query_args(
- HttpDb, Path, [revs, {open_revs, Revs} | Options]),
- Self = self(),
- Streamer = spawn_link(fun() ->
- send_req(
- HttpDb,
- [{path, Path}, {qs, QArgs},
- {ibrowse_options, [{stream_to, {self(), once}}]},
- {headers, [{"Accept", "multipart/mixed"}]}],
- fun(200, Headers, StreamDataFun) ->
- remote_open_doc_revs_streamer_start(Self),
- {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- get_value("Content-Type", Headers),
- StreamDataFun,
- fun mp_parse_mixed/1)
- end),
- unlink(Self)
+ QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+ {Pid, Ref} = spawn_monitor(fun() ->
+ Self = self(),
+ Callback = fun(200, Headers, StreamDataFun) ->
+ remote_open_doc_revs_streamer_start(Self),
+ {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+ get_value("Content-Type", Headers),
+ StreamDataFun,
+ fun mp_parse_mixed/1
+ )
+ end,
+ Streamer = spawn_link(fun() ->
+ Params = [
+ {path, Path},
+ {qs, QS},
+ {ibrowse_options, [{stream_to, {self(), once}}]},
+ {headers, [{"Accept", "multipart/mixed"}]}
+ ],
+ % We're setting retries to 0 here to avoid the case where the
+ % Streamer retries the request and ends up jumbling together two
+ % different response bodies. Retries are handled explicitly by
+ % open_doc_revs itself.
+ send_req(HttpDb#httpdb{retries = 0}, Params, Callback)
+ end),
+ % If this process dies normally we can leave
+ % the Streamer process hanging around keeping an
+ % HTTP connection open. This is a bit of a
+ % hammer approach to making sure it releases
+ % that connection back to the pool.
+ spawn(fun() ->
+ Ref = erlang:monitor(process, Self),
+ receive
+ {'DOWN', Ref, process, Self, normal} ->
+ exit(Streamer, {streamer_parent_died, Self});
+ {'DOWN', Ref, process, Self, _} ->
+ ok
+ end
end),
- % If this process dies normally we can leave
- % the Streamer process hanging around keeping an
- % HTTP connection open. This is a bit of a
- % hammer approach to making sure it releases
- % that connection back to the pool.
- spawn(fun() ->
- Ref = erlang:monitor(process, Self),
receive
- {'DOWN', Ref, process, Self, normal} ->
- exit(Streamer, {streamer_parent_died, Self});
- {'DOWN', Ref, process, Self, _} ->
- ok
- end
+ {started_open_doc_revs, Ref} ->
+ Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc),
+ exit({exit_ok, Ret})
+ end
end),
receive
- {started_open_doc_revs, Ref} ->
- receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
+ {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+ Ret;
+ {'DOWN', Ref, process, Pid, Else} ->
+ Url = couch_util:url_strip_password(
+ couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+ ),
+ #httpdb{retries = Retries, wait = Wait0} = HttpDb,
+ Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
+ twig:log(notice,"Retrying GET to ~s in ~p seconds due to error ~s",
+ [Url, Wait / 1000, Else]
+ ),
+ ok = timer:sleep(Wait),
+ RetryDb = HttpDb#httpdb{
+ retries = Retries - 1,
+ wait = Wait
+ },
+ open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
end;
open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
{ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),