diff options
author | Dirkjan Ochtman <djc@apache.org> | 2013-10-03 20:47:09 +0200 |
---|---|---|
committer | Dirkjan Ochtman <djc@apache.org> | 2013-10-03 20:47:35 +0200 |
commit | 4ca2ceccfb360c76f4e6033d93fa9fedd34d9ce7 (patch) | |
tree | d4bec7a259989d3b8e2ac6e9448b7946eac65adc | |
parent | 1c3ddcb6be3135379249a2261e6ad8ed59945bc2 (diff) | |
parent | df0423b5d5c35170626cfc0c10216d2e72e3a83e (diff) | |
download | couchdb-4ca2ceccfb360c76f4e6033d93fa9fedd34d9ce7.tar.gz |
Merge branch '1901-atomic-multipart-retries' of https://github.com/apache/couchdb
Merged after r+ from rnewson on IRC, general trustworthiness of process.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_api_wrap.erl | 165 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_worker.erl | 4 |
2 files changed, 133 insertions, 36 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index 90cfa8e5a..52e15b7fa 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -48,6 +48,7 @@ get_value/3 ]). +-define(MAX_WAIT, 5 * 60 * 1000). db_uri(#httpdb{url = Url}) -> couch_util:url_strip_password(Url); @@ -157,34 +158,93 @@ get_missing_revs(Db, IdRevs) -> +open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) -> + Path = encode_doc_id(Id), + QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), + Url = couch_util:url_strip_password( + couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}]) + ), + ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]), + exit(kaboom); open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> Path = encode_doc_id(Id), - QArgs = options_to_query_args( - HttpDb, Path, [revs, {open_revs, Revs} | Options]), - Self = self(), - Streamer = spawn_link(fun() -> - send_req( - HttpDb, - [{path, Path}, {qs, QArgs}, - {ibrowse_options, [{stream_to, {self(), once}}]}, - {headers, [{"Accept", "multipart/mixed"}]}], - fun(200, Headers, StreamDataFun) -> - remote_open_doc_revs_streamer_start(Self), - {<<"--">>, _, _} = couch_httpd:parse_multipart_request( - get_value("Content-Type", Headers), - StreamDataFun, - fun mp_parse_mixed/1) - end), - unlink(Self) + QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), + {Pid, Ref} = spawn_monitor(fun() -> + Self = self(), + Callback = fun(200, Headers, StreamDataFun) -> + remote_open_doc_revs_streamer_start(Self), + {<<"--">>, _, _} = couch_httpd:parse_multipart_request( + get_value("Content-Type", Headers), + StreamDataFun, + fun mp_parse_mixed/1 + ) + end, + Streamer = spawn_link(fun() -> + Params = [ + {path, Path}, + {qs, QS}, + {ibrowse_options, [{stream_to, {self(), once}}]}, + {headers, [{"Accept", "multipart/mixed"}]} + ], + % We're setting retries to 0 here to avoid the case where the + % Streamer retries the request and ends up jumbling together two + % different response bodies. Retries are handled explicitly by + % open_doc_revs itself. + send_req(HttpDb#httpdb{retries = 0}, Params, Callback) + end), + % If this process dies normally we can leave + % the Streamer process hanging around keeping an + % HTTP connection open. This is a bit of a + % hammer approach to making sure it releases + % that connection back to the pool. + spawn(fun() -> + Ref = erlang:monitor(process, Self), + receive + {'DOWN', Ref, process, Self, normal} -> + exit(Streamer, {streamer_parent_died, Self}); + {'DOWN', Ref, process, Self, _} -> + ok + end end), + receive + {started_open_doc_revs, Ref} -> + Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc), + exit({exit_ok, Ret}) + end + end), receive - {started_open_doc_revs, Ref} -> - receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) + {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> + Ret; + {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} -> + throw(Stub); + {'DOWN', Ref, process, Pid, Else} -> + Url = couch_util:url_strip_password( + couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}]) + ), + #httpdb{retries = Retries, wait = Wait0} = HttpDb, + Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT), + ?LOG_INFO("Retrying GET to ~s in ~p seconds due to error ~p", + [Url, Wait / 1000, error_reason(Else)] + ), + ok = timer:sleep(Wait), + RetryDb = HttpDb#httpdb{ + retries = Retries - 1, + wait = Wait + }, + open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc) end; open_doc_revs(Db, Id, Revs, Options, Fun, Acc) -> {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options), {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}. +error_reason({http_request_failed, "GET", _Url, {error, timeout}}) -> + timeout; +error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) -> + req_timedout; +error_reason({http_request_failed, "GET", _Url, Error}) -> + Error; +error_reason(Else) -> + Else. open_doc(#httpdb{} = Db, Id, Options) -> send_req( @@ -228,7 +288,11 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}], Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}}, send_req( - HttpDb, + % A crash here bubbles all the way back up to run_user_fun inside + % open_doc_revs, which will retry the whole thing. That's the + % appropriate course of action, since we've already started streaming + % the response body from the GET request. + HttpDb#httpdb{retries = 0}, [{method, put}, {path, encode_doc_id(DocId)}, {qs, QArgs}, {headers, Headers}, {body, Body}], fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 -> @@ -465,6 +529,8 @@ options_to_query_args([revs | Rest], Acc) -> options_to_query_args(Rest, [{"revs", "true"} | Acc]); options_to_query_args([{open_revs, all} | Rest], Acc) -> options_to_query_args(Rest, [{"open_revs", "all"} | Acc]); +options_to_query_args([latest | Rest], Acc) -> + options_to_query_args(Rest, [{"latest", "true"} | Acc]); options_to_query_args([{open_revs, Revs} | Rest], Acc) -> JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))), options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]). @@ -519,7 +585,7 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> fun() -> receive_doc_data(Streamer, Ref) end, Ref) of {ok, Doc, Parser} -> - case UserFun({ok, Doc}, UserAcc) of + case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of {ok, UserAcc2} -> ok; {skip, UserAcc2} -> @@ -530,13 +596,13 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> {"application/json", []} -> Doc = couch_doc:from_json_obj( ?JSON_DECODE(receive_all(Streamer, Ref, []))), - {_, UserAcc2} = UserFun({ok, Doc}, UserAcc), + {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref), receive_docs(Streamer, UserFun, Ref, UserAcc2); {"application/json", [{"error","true"}]} -> {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])), Rev = get_value(<<"missing">>, ErrorProps), Result = {{not_found, missing}, couch_doc:parse_rev(Rev)}, - {_, UserAcc2} = UserFun(Result, UserAcc), + {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref), receive_docs(Streamer, UserFun, Ref, UserAcc2) end; {done, Ref} -> @@ -544,6 +610,36 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> end. +run_user_fun(UserFun, Arg, UserAcc, OldRef) -> + {Pid, Ref} = spawn_monitor(fun() -> + try UserFun(Arg, UserAcc) of + Resp -> + exit({exit_ok, Resp}) + catch + throw:Reason -> + exit({exit_throw, Reason}); + error:Reason -> + exit({exit_error, Reason}); + exit:Reason -> + exit({exit_exit, Reason}) + end + end), + receive + {started_open_doc_revs, NewRef} -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + restart_remote_open_doc_revs(OldRef, NewRef); + {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> + Ret; + {'DOWN', Ref, process, Pid, {exit_throw, Reason}} -> + throw(Reason); + {'DOWN', Ref, process, Pid, {exit_error, Reason}} -> + erlang:error(Reason); + {'DOWN', Ref, process, Pid, {exit_exit, Reason}} -> + erlang:exit(Reason) + end. + + restart_remote_open_doc_revs(Ref, NewRef) -> receive {body_bytes, Ref, _} -> @@ -631,6 +727,7 @@ doc_from_multi_part_stream(ContentType, DataFun, Ref) -> {doc_bytes, Ref, DocBytes} -> Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)), ReadAttachmentDataFun = fun() -> + link(Parser), Parser ! {get_bytes, Ref, self()}, receive {started_open_doc_revs, NewRef} -> @@ -747,6 +844,12 @@ rev_to_str({_Pos, _Id} = Rev) -> rev_to_str(Rev) -> Rev. +write_fun() -> + fun(Data) -> + receive {get_data, Ref, From} -> + From ! {data, Ref, Data} + end + end. stream_doc({JsonBytes, Atts, Boundary, Len}) -> case erlang:erase({doc_streamer, Boundary}) of @@ -756,17 +859,11 @@ stream_doc({JsonBytes, Atts, Boundary, Len}) -> _ -> ok end, - Self = self(), - DocStreamer = spawn_link(fun() -> - couch_doc:doc_to_multi_part_stream( - Boundary, JsonBytes, Atts, - fun(Data) -> - receive {get_data, Ref, From} -> - From ! {data, Ref, Data} - end - end, true), - unlink(Self) - end), + DocStreamer = spawn_link( + couch_doc, + doc_to_multi_part_stream, + [Boundary, JsonBytes, Atts, write_fun(), true] + ), erlang:put({doc_streamer, Boundary}, DocStreamer), {ok, <<>>, {Len, Boundary}}; stream_doc({0, Id}) -> diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index e8a3570ba..d66d47805 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -296,13 +296,13 @@ spawn_doc_reader(Source, Target, FetchParams) -> fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) -> try couch_replicator_api_wrap:open_doc_revs( - Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc) + Source, Id, Revs, [{atts_since, PAs}, latest], DocHandler, Acc) catch throw:{missing_stub, _} -> ?LOG_ERROR("Retrying fetch and update of document `~s` due to out of " "sync attachment stubs. Missing revisions are: ~s", [Id, couch_doc:revs_to_strs(Revs)]), - couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [], DocHandler, Acc) + couch_replicator_api_wrap:open_doc_revs(Source, Id, Revs, [latest], DocHandler, Acc) end. |