diff options
author | Ronny <ronny@apache.org> | 2022-05-19 16:00:28 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-19 16:00:28 +0200 |
commit | 53228f82400c7451a8fb1d2f45c9734c8a9eae1d (patch) | |
tree | 74564d45753e7738c8816ff7579cdc5fa00dc17a | |
parent | f3b5f2755588f2f31951e05bf925f303101f9ffd (diff) | |
parent | 6e0c80a42b1c2364e1b4930448db528e2739af49 (diff) | |
download | couchdb-53228f82400c7451a8fb1d2f45c9734c8a9eae1d.tar.gz |
Merge branch '3.x' into big-r81-patch-1
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 32 | ||||
-rw-r--r-- | src/couch/src/couch_httpd_multipart.erl | 30 | ||||
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 46 | ||||
-rw-r--r-- | test/elixir/test/attachments_multipart_test.exs | 67 | ||||
-rw-r--r-- | test/elixir/test/config/suite.elixir | 3 |
5 files changed, 141 insertions, 37 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 9c9c4ef87..4392df194 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -1206,9 +1206,16 @@ db_doc_req(#httpd{method = 'PUT', user_ctx = Ctx} = Req, Db, DocId) -> ), Doc = couch_doc_from_req(Req, Db, DocId, Doc0), try - Result = send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType), + {HttpCode, RespHeaders1, RespBody} = update_doc_req( + Req, + Db, + DocId, + Doc, + RespHeaders, + UpdateType + ), WaitFun(), - Result + send_json(Req, HttpCode, RespHeaders1, RespBody) catch throw:Err -> % Document rejected by a validate_doc_update function. @@ -1534,14 +1541,12 @@ send_updated_doc(Req, Db, DocId, Json) -> send_updated_doc(Req, Db, DocId, Doc, Headers) -> send_updated_doc(Req, Db, DocId, Doc, Headers, interactive_edit). -send_updated_doc( +send_updated_doc(Req, Db, DocId, Doc, Headers, Type) -> + {Code, Headers1, Body} = update_doc_req(Req, Db, DocId, Doc, Headers, Type), + send_json(Req, Code, Headers1, Body). + +update_doc_req(Req, Db, DocId, Doc, Headers, UpdateType) -> #httpd{user_ctx = Ctx} = Req, - Db, - DocId, - #doc{deleted = Deleted} = Doc, - Headers, - UpdateType -) -> W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of @@ -1552,15 +1557,10 @@ send_updated_doc( _ -> [UpdateType, {user_ctx, Ctx}, {w, W}] end, - {Status, {etag, Etag}, Body} = update_doc( - Db, - DocId, - #doc{deleted = Deleted} = Doc, - Options - ), + {Status, {etag, Etag}, Body} = update_doc(Db, DocId, Doc, Options), HttpCode = http_code_from_status(Status), ResponseHeaders = [{"ETag", Etag} | Headers], - send_json(Req, HttpCode, ResponseHeaders, Body). + {HttpCode, ResponseHeaders, Body}. http_code_from_status(Status) -> case Status of diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 95a2c9e3c..11ee6790e 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -104,6 +104,9 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> receive abort_parsing -> ok; + {hello_from_writer, Ref, WriterPid} -> + NewCounters = handle_hello(WriterPid, Counters), + mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting}); {get_bytes, Ref, From} -> C2 = update_writer(From, Counters), case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of @@ -134,6 +137,9 @@ mp_abort_parse_atts(_, _) -> maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> receive + {hello_from_writer, Ref, WriterPid} -> + NewCounters = handle_hello(WriterPid, Counters), + maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting}); {get_bytes, Ref, From} -> NewCounters = update_writer(From, Counters), maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]}) @@ -195,6 +201,9 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, maybe_send_data(NewAcc) end; + {hello_from_writer, Ref, WriterPid} -> + C2 = handle_hello(WriterPid, Counters), + maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting}); {get_bytes, Ref, X} -> C2 = update_writer(X, Counters), maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]}) @@ -204,17 +213,18 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end end. +handle_hello(WriterPid, Counters) -> + WriterRef = erlang:monitor(process, WriterPid), + orddict:store(WriterPid, {WriterRef, 0}, Counters). + update_writer(WriterPid, Counters) -> - UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, - InitialValue = - case orddict:find(WriterPid, Counters) of - {ok, IV} -> - IV; - error -> - WriterRef = erlang:monitor(process, WriterPid), - {WriterRef, 1} - end, - orddict:update(WriterPid, UpdateFun, InitialValue, Counters). + case orddict:find(WriterPid, Counters) of + {ok, {WriterRef, Count}} -> + orddict:store(WriterPid, {WriterRef, Count + 1}, Counters); + error -> + WriterRef = erlang:monitor(process, WriterPid), + orddict:store(WriterPid, {WriterRef, 1}, Counters) + end. remove_writer(WriterPid, WriterRef, Counters) -> case orddict:find(WriterPid, Counters) of diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index a90c94ade..cfda1a37e 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -637,23 +637,49 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) -> Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0], [Doc#doc{atts = Atts} | make_att_readers(Rest)]. -make_att_reader({follows, Parser, Ref}) -> +make_att_reader({follows, Parser, Ref}) when is_pid(Parser) -> + % This code will fail if the returned closure is called by a + % process other than the one that called make_att_reader/1 in the + % first place. The reason we don't put everything inside the + % closure is that the `hello_from_writer` message must *always* be + % sent to the parser, even if the closure never gets called. Also, + % make sure `hello_from_writer` is sent only once for the all the + % rest of the possible attachments. + WriterPid = self(), + ParserRef = + case get({mp_parser_ref, Parser}) of + undefined -> + % First time encountering a particular parser pid. Monitor it, + % in case it dies, and notify it about us, so it could monitor + % us in case we die. + PRef = erlang:monitor(process, Parser), + put({mp_parser_ref, Parser}, PRef), + Parser ! {hello_from_writer, Ref, WriterPid}, + PRef; + Else -> + Else + end, fun() -> - ParserRef = - case get(mp_parser_ref) of - undefined -> - PRef = erlang:monitor(process, Parser), - put(mp_parser_ref, PRef), - PRef; - Else -> - Else - end, + % Make sure the closure is always called from the same process which + % sent the hello_from_writer message. + case self() =:= WriterPid of + true -> ok; + false -> error({make_att_pid_assertion, self(), WriterPid}) + end, + % Check if parser already died. This is for belt and suspenders mostly, + % in case somehow we call the data function again after mp_parser_died + % was thrown, so we are not stuck forever waiting for bytes. + case get({mp_parser_died, Parser}) of + undefined -> ok; + AlreadyDiedReason -> throw({mp_parser_died, AlreadyDiedReason}) + end, Parser ! {get_bytes, Ref, self()}, receive {bytes, Ref, Bytes} -> rexi:reply(attachment_chunk_received), Bytes; {'DOWN', ParserRef, _, _, Reason} -> + put({mp_parser_died, Parser}, Reason), throw({mp_parser_died, Reason}) end end; diff --git a/test/elixir/test/attachments_multipart_test.exs b/test/elixir/test/attachments_multipart_test.exs index f7d5d9519..2cedef513 100644 --- a/test/elixir/test/attachments_multipart_test.exs +++ b/test/elixir/test/attachments_multipart_test.exs @@ -260,6 +260,73 @@ defmodule AttachmentMultipartTest do ) end + @tag :with_db + test "multipart attachments with new_edits=false", context do + db_name = context[:db_name] + + att_data = String.duplicate("x", 100_000) + att_len = byte_size(att_data) + document = """ + { + "body": "This is a body.", + "_attachments": { + "foo.txt": { + "follows": true, + "content_type": "application/test", + "length": #{att_len} + } + } + } + """ + + multipart_data = + "--abc123\r\n" <> + "content-type: application/json\r\n" <> + "\r\n" <> + document <> + "\r\n--abc123\r\n" <> + "\r\n" <> + att_data <> + "\r\n--abc123--epilogue" + + resp = + Couch.put( + "/#{db_name}/multipart_replicated_changes", + body: multipart_data, + headers: ["Content-Type": "multipart/related;boundary=\"abc123\""] + ) + + assert resp.status_code in [201, 202] + assert resp.body["ok"] == true + + rev = resp.body["rev"] + + resp = Couch.get("/#{db_name}/multipart_replicated_changes/foo.txt") + + assert resp.body == att_data + + # https://github.com/apache/couchdb/issues/3939 + # Repeating the request should not hang + Enum.each(0..10, fn _ -> + put_multipart_new_edits_false(db_name, rev, multipart_data) + end) + end + + defp put_multipart_new_edits_false(db_name, rev, multipart_data) do + # Help ensure we're re-using client connections + ibrowse_opts = [{:max_sessions, 1}, {:max_pipeline_size, 1}] + resp = + Couch.put( + "/#{db_name}/multipart_replicated_changes?new_edits=false&rev=#{rev}", + body: multipart_data, + headers: ["Content-Type": "multipart/related;boundary=\"abc123\""], + ibrowse: ibrowse_opts + ) + + assert resp.status_code in [201, 202] + assert resp.body["ok"] == true + end + defp test_multipart_att_compression(dbname) do doc = %{ "_id" => "foobar" diff --git a/test/elixir/test/config/suite.elixir b/test/elixir/test/config/suite.elixir index 2e97553ee..e071da87f 100644 --- a/test/elixir/test/config/suite.elixir +++ b/test/elixir/test/config/suite.elixir @@ -10,7 +10,8 @@ ], "AttachmentMultipartTest": [ "manages attachments multipart requests successfully", - "manages compressed attachments successfully" + "manages compressed attachments successfully", + "multipart attachments with new_edits=false" ], "AttachmentNamesTest": [ "saves attachment names successfully" |