summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDirkjan Ochtman <djc@apache.org>2013-10-03 20:47:09 +0200
committerDirkjan Ochtman <djc@apache.org>2013-10-03 20:47:35 +0200
commit4ca2ceccfb360c76f4e6033d93fa9fedd34d9ce7 (patch)
treed4bec7a259989d3b8e2ac6e9448b7946eac65adc
parent1c3ddcb6be3135379249a2261e6ad8ed59945bc2 (diff)
parentdf0423b5d5c35170626cfc0c10216d2e72e3a83e (diff)
downloadcouchdb-4ca2ceccfb360c76f4e6033d93fa9fedd34d9ce7.tar.gz
Merge branch '1901-atomic-multipart-retries' of https://github.com/apache/couchdb
Merged after r+ from rnewson on IRC, general trustworthiness of process.
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl165
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl4
2 files changed, 133 insertions, 36 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 90cfa8e5a..52e15b7fa 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -48,6 +48,7 @@
get_value/3
]).
+-define(MAX_WAIT, 5 * 60 * 1000).
db_uri(#httpdb{url = Url}) ->
couch_util:url_strip_password(Url);
@@ -157,34 +158,93 @@ get_missing_revs(Db, IdRevs) ->
+open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
+ Path = encode_doc_id(Id),
+ QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+ Url = couch_util:url_strip_password(
+ couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+ ),
+ ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]),
+ exit(kaboom);
open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
Path = encode_doc_id(Id),
- QArgs = options_to_query_args(
- HttpDb, Path, [revs, {open_revs, Revs} | Options]),
- Self = self(),
- Streamer = spawn_link(fun() ->
- send_req(
- HttpDb,
- [{path, Path}, {qs, QArgs},
- {ibrowse_options, [{stream_to, {self(), once}}]},
- {headers, [{"Accept", "multipart/mixed"}]}],
- fun(200, Headers, StreamDataFun) ->
- remote_open_doc_revs_streamer_start(Self),
- {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- get_value("Content-Type", Headers),
- StreamDataFun,
- fun mp_parse_mixed/1)
- end),
- unlink(Self)
+ QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+ {Pid, Ref} = spawn_monitor(fun() ->
+ Self = self(),
+ Callback = fun(200, Headers, StreamDataFun) ->
+ remote_open_doc_revs_streamer_start(Self),
+ {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+ get_value("Content-Type", Headers),
+ StreamDataFun,
+ fun mp_parse_mixed/1
+ )
+ end,
+ Streamer = spawn_link(fun() ->
+ Params = [
+ {path, Path},
+ {qs, QS},
+ {ibrowse_options, [{stream_to, {self(), once}}]},
+ {headers, [{"Accept", "multipart/mixed"}]}
+ ],
+ % We're setting retries to 0 here to avoid the case where the
+ % Streamer retries the request and ends up jumbling together two
+ % different response bodies. Retries are handled explicitly by
+ % open_doc_revs itself.
+ send_req(HttpDb#httpdb{retries = 0}, Params, Callback)
+ end),
+ % If this process dies normally we can leave
+ % the Streamer process hanging around keeping an
+ % HTTP connection open. This is a bit of a
+ % hammer approach to making sure it releases
+ % that connection back to the pool.
+ spawn(fun() ->
+ Ref = erlang:monitor(process, Self),
+ receive
+ {'DOWN', Ref, process, Self, normal} ->
+ exit(Streamer, {streamer_parent_died, Self});
+ {'DOWN', Ref, process, Self, _} ->
+ ok
+ end
end),
+ receive
+ {started_open_doc_revs, Ref} ->
+ Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc),
+ exit({exit_ok, Ret})
+ end
+ end),
receive
- {started_open_doc_revs, Ref} ->
- receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
+ {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+ Ret;
+ {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} ->
+ throw(Stub);
+ {'DOWN', Ref, process, Pid, Else} ->
+ Url = couch_util:url_strip_password(
+ couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
+ ),
+ #httpdb{retries = Retries, wait = Wait0} = HttpDb,
+ Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
+ ?LOG_INFO("Retrying GET to ~s in ~p seconds due to error ~p",
+ [Url, Wait / 1000, error_reason(Else)]
+ ),
+ ok = timer:sleep(Wait),
+ RetryDb = HttpDb#httpdb{
+ retries = Retries - 1,
+ wait = Wait
+ },
+ open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
end;
open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
{ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
{ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
+error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
+ timeout;
+error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) ->
+ req_timedout;
+error_reason({http_request_failed, "GET", _Url, Error}) ->
+ Error;
+error_reason(Else) ->
+ Else.
open_doc(#httpdb{} = Db, Id, Options) ->
send_req(
@@ -228,7 +288,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
send_req(
- HttpDb,
+ % A crash here bubbles all the way back up to run_user_fun inside
+ % open_doc_revs, which will retry the whole thing. That's the
+ % appropriate course of action, since we've already started streaming
+ % the response body from the GET request.
+ HttpDb#httpdb{retries = 0},
[{method, put}, {path, encode_doc_id(DocId)},
{qs, QArgs}, {headers, Headers}, {body, Body}],
fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
@@ -465,6 +529,8 @@ options_to_query_args([revs | Rest], Acc) ->
options_to_query_args(Rest, [{"revs", "true"} | Acc]);
options_to_query_args([{open_revs, all} | Rest], Acc) ->
options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
+options_to_query_args([latest | Rest], Acc) ->
+ options_to_query_args(Rest, [{"latest", "true"} | Acc]);
options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
@@ -519,7 +585,7 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
fun() -> receive_doc_data(Streamer, Ref) end,
Ref) of
{ok, Doc, Parser} ->
- case UserFun({ok, Doc}, UserAcc) of
+ case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of
{ok, UserAcc2} ->
ok;
{skip, UserAcc2} ->
@@ -530,13 +596,13 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
{"application/json", []} ->
Doc = couch_doc:from_json_obj(
?JSON_DECODE(receive_all(Streamer, Ref, []))),
- {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
+ {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref),
receive_docs(Streamer, UserFun, Ref, UserAcc2);
{"application/json", [{"error","true"}]} ->
{ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
Rev = get_value(<<"missing">>, ErrorProps),
Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
- {_, UserAcc2} = UserFun(Result, UserAcc),
+ {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref),
receive_docs(Streamer, UserFun, Ref, UserAcc2)
end;
{done, Ref} ->
@@ -544,6 +610,36 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) ->
end.
+run_user_fun(UserFun, Arg, UserAcc, OldRef) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ try UserFun(Arg, UserAcc) of
+ Resp ->
+ exit({exit_ok, Resp})
+ catch
+ throw:Reason ->
+ exit({exit_throw, Reason});
+ error:Reason ->
+ exit({exit_error, Reason});
+ exit:Reason ->
+ exit({exit_exit, Reason})
+ end
+ end),
+ receive
+ {started_open_doc_revs, NewRef} ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ restart_remote_open_doc_revs(OldRef, NewRef);
+ {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+ Ret;
+ {'DOWN', Ref, process, Pid, {exit_throw, Reason}} ->
+ throw(Reason);
+ {'DOWN', Ref, process, Pid, {exit_error, Reason}} ->
+ erlang:error(Reason);
+ {'DOWN', Ref, process, Pid, {exit_exit, Reason}} ->
+ erlang:exit(Reason)
+ end.
+
+
restart_remote_open_doc_revs(Ref, NewRef) ->
receive
{body_bytes, Ref, _} ->
@@ -631,6 +727,7 @@ doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
{doc_bytes, Ref, DocBytes} ->
Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
ReadAttachmentDataFun = fun() ->
+ link(Parser),
Parser ! {get_bytes, Ref, self()},
receive
{started_open_doc_revs, NewRef} ->
@@ -747,6 +844,12 @@ rev_to_str({_Pos, _Id} = Rev) ->
rev_to_str(Rev) ->
Rev.
+write_fun() ->
+ fun(Data) ->
+ receive {get_data, Ref, From} ->
+ From ! {data, Ref, Data}
+ end
+ end.
stream_doc({JsonBytes, Atts, Boundary, Len}) ->
case erlang:erase({doc_streamer, Boundary}) of
@@ -756,17 +859,11 @@ stream_doc({JsonBytes, Atts, Boundary, Len}) ->
_ ->
ok
end,
- Self = self(),
- DocStreamer = spawn_link(fun() ->
- couch_doc:doc_to_multi_part_stream(
- Boundary, JsonBytes, Atts,
- fun(Data) ->
- receive {get_data, Ref, From} ->
- From ! {data, Ref, Data}
- end
- end, true),
- unlink(Self)
- end),
+ DocStreamer = spawn_link(
+ couch_doc,
+ doc_to_multi_part_stream,
+ [Boundary, JsonBytes, Atts, write_fun(), true]
+ ),
erlang:put({doc_streamer, Boundary}, DocStreamer),
{ok, <<>>, {Len, Boundary}};
stream_doc({0, Id}) ->
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index e8a3570ba..d66d47805 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -296,13 +296,13 @@ spawn_doc_reader(Source, Target, FetchParams) ->
fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
try
couch_replicator_api_wrap:open_doc_revs(
- Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc)
+ Source, Id, Revs, [{atts_since, PAs}, latest], DocHandler, Acc)
catch
throw:{missing_stub, _} ->
?LOG_ERROR("Retrying fetch and update of document `~s` due to out of "
"sync attachment stubs. Missing revisions are: ~s",
[Id, couch_doc:revs_to_strs(Revs)]),
- couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [], DocHandler, Acc)
+ couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [latest], DocHandler, Acc)
end.