summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRonny <ronny@apache.org>2022-05-19 16:00:28 +0200
committerGitHub <noreply@github.com>2022-05-19 16:00:28 +0200
commit53228f82400c7451a8fb1d2f45c9734c8a9eae1d (patch)
tree74564d45753e7738c8816ff7579cdc5fa00dc17a
parentf3b5f2755588f2f31951e05bf925f303101f9ffd (diff)
parent6e0c80a42b1c2364e1b4930448db528e2739af49 (diff)
downloadcouchdb-53228f82400c7451a8fb1d2f45c9734c8a9eae1d.tar.gz
Merge branch '3.x' into big-r81-patch-1
-rw-r--r--src/chttpd/src/chttpd_db.erl32
-rw-r--r--src/couch/src/couch_httpd_multipart.erl30
-rw-r--r--src/fabric/src/fabric_rpc.erl46
-rw-r--r--test/elixir/test/attachments_multipart_test.exs67
-rw-r--r--test/elixir/test/config/suite.elixir3
5 files changed, 141 insertions, 37 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 9c9c4ef87..4392df194 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -1206,9 +1206,16 @@ db_doc_req(#httpd{method = 'PUT', user_ctx = Ctx} = Req, Db, DocId) ->
),
Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
try
- Result = send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType),
+ {HttpCode, RespHeaders1, RespBody} = update_doc_req(
+ Req,
+ Db,
+ DocId,
+ Doc,
+ RespHeaders,
+ UpdateType
+ ),
WaitFun(),
- Result
+ send_json(Req, HttpCode, RespHeaders1, RespBody)
catch
throw:Err ->
% Document rejected by a validate_doc_update function.
@@ -1534,14 +1541,12 @@ send_updated_doc(Req, Db, DocId, Json) ->
send_updated_doc(Req, Db, DocId, Doc, Headers) ->
send_updated_doc(Req, Db, DocId, Doc, Headers, interactive_edit).
-send_updated_doc(
+send_updated_doc(Req, Db, DocId, Doc, Headers, Type) ->
+ {Code, Headers1, Body} = update_doc_req(Req, Db, DocId, Doc, Headers, Type),
+ send_json(Req, Code, Headers1, Body).
+
+update_doc_req(Req, Db, DocId, Doc, Headers, UpdateType) ->
#httpd{user_ctx = Ctx} = Req,
- Db,
- DocId,
- #doc{deleted = Deleted} = Doc,
- Headers,
- UpdateType
-) ->
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options =
case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of
@@ -1552,15 +1557,10 @@ send_updated_doc(
_ ->
[UpdateType, {user_ctx, Ctx}, {w, W}]
end,
- {Status, {etag, Etag}, Body} = update_doc(
- Db,
- DocId,
- #doc{deleted = Deleted} = Doc,
- Options
- ),
+ {Status, {etag, Etag}, Body} = update_doc(Db, DocId, Doc, Options),
HttpCode = http_code_from_status(Status),
ResponseHeaders = [{"ETag", Etag} | Headers],
- send_json(Req, HttpCode, ResponseHeaders, Body).
+ {HttpCode, ResponseHeaders, Body}.
http_code_from_status(Status) ->
case Status of
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index 95a2c9e3c..11ee6790e 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -104,6 +104,9 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
receive
abort_parsing ->
ok;
+ {hello_from_writer, Ref, WriterPid} ->
+ NewCounters = handle_hello(WriterPid, Counters),
+ mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting});
{get_bytes, Ref, From} ->
C2 = update_writer(From, Counters),
case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of
@@ -134,6 +137,9 @@ mp_abort_parse_atts(_, _) ->
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive
+ {hello_from_writer, Ref, WriterPid} ->
+ NewCounters = handle_hello(WriterPid, Counters),
+ maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting});
{get_bytes, Ref, From} ->
NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]})
@@ -195,6 +201,9 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
maybe_send_data(NewAcc)
end;
+ {hello_from_writer, Ref, WriterPid} ->
+ C2 = handle_hello(WriterPid, Counters),
+ maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting});
{get_bytes, Ref, X} ->
C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]})
@@ -204,17 +213,18 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end
end.
+handle_hello(WriterPid, Counters) ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ orddict:store(WriterPid, {WriterRef, 0}, Counters).
+
update_writer(WriterPid, Counters) ->
- UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
- InitialValue =
- case orddict:find(WriterPid, Counters) of
- {ok, IV} ->
- IV;
- error ->
- WriterRef = erlang:monitor(process, WriterPid),
- {WriterRef, 1}
- end,
- orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+ case orddict:find(WriterPid, Counters) of
+ {ok, {WriterRef, Count}} ->
+ orddict:store(WriterPid, {WriterRef, Count + 1}, Counters);
+ error ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ orddict:store(WriterPid, {WriterRef, 1}, Counters)
+ end.
remove_writer(WriterPid, WriterRef, Counters) ->
case orddict:find(WriterPid, Counters) of
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index a90c94ade..cfda1a37e 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -637,23 +637,49 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) ->
Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0],
[Doc#doc{atts = Atts} | make_att_readers(Rest)].
-make_att_reader({follows, Parser, Ref}) ->
+make_att_reader({follows, Parser, Ref}) when is_pid(Parser) ->
+ % This code will fail if the returned closure is called by a
+ % process other than the one that called make_att_reader/1 in the
+ % first place. The reason we don't put everything inside the
+ % closure is that the `hello_from_writer` message must *always* be
+ % sent to the parser, even if the closure never gets called. Also,
+ % make sure `hello_from_writer` is sent only once for the all the
+ % rest of the possible attachments.
+ WriterPid = self(),
+ ParserRef =
+ case get({mp_parser_ref, Parser}) of
+ undefined ->
+ % First time encountering a particular parser pid. Monitor it,
+ % in case it dies, and notify it about us, so it could monitor
+ % us in case we die.
+ PRef = erlang:monitor(process, Parser),
+ put({mp_parser_ref, Parser}, PRef),
+ Parser ! {hello_from_writer, Ref, WriterPid},
+ PRef;
+ Else ->
+ Else
+ end,
fun() ->
- ParserRef =
- case get(mp_parser_ref) of
- undefined ->
- PRef = erlang:monitor(process, Parser),
- put(mp_parser_ref, PRef),
- PRef;
- Else ->
- Else
- end,
+ % Make sure the closure is always called from the same process which
+ % sent the hello_from_writer message.
+ case self() =:= WriterPid of
+ true -> ok;
+ false -> error({make_att_pid_assertion, self(), WriterPid})
+ end,
+ % Check if parser already died. This is for belt and suspenders mostly,
+ % in case somehow we call the data function again after mp_parser_died
+ % was thrown, so we are not stuck forever waiting for bytes.
+ case get({mp_parser_died, Parser}) of
+ undefined -> ok;
+ AlreadyDiedReason -> throw({mp_parser_died, AlreadyDiedReason})
+ end,
Parser ! {get_bytes, Ref, self()},
receive
{bytes, Ref, Bytes} ->
rexi:reply(attachment_chunk_received),
Bytes;
{'DOWN', ParserRef, _, _, Reason} ->
+ put({mp_parser_died, Parser}, Reason),
throw({mp_parser_died, Reason})
end
end;
diff --git a/test/elixir/test/attachments_multipart_test.exs b/test/elixir/test/attachments_multipart_test.exs
index f7d5d9519..2cedef513 100644
--- a/test/elixir/test/attachments_multipart_test.exs
+++ b/test/elixir/test/attachments_multipart_test.exs
@@ -260,6 +260,73 @@ defmodule AttachmentMultipartTest do
)
end
+ @tag :with_db
+ test "multipart attachments with new_edits=false", context do
+ db_name = context[:db_name]
+
+ att_data = String.duplicate("x", 100_000)
+ att_len = byte_size(att_data)
+ document = """
+ {
+ "body": "This is a body.",
+ "_attachments": {
+ "foo.txt": {
+ "follows": true,
+ "content_type": "application/test",
+ "length": #{att_len}
+ }
+ }
+ }
+ """
+
+ multipart_data =
+ "--abc123\r\n" <>
+ "content-type: application/json\r\n" <>
+ "\r\n" <>
+ document <>
+ "\r\n--abc123\r\n" <>
+ "\r\n" <>
+ att_data <>
+ "\r\n--abc123--epilogue"
+
+ resp =
+ Couch.put(
+ "/#{db_name}/multipart_replicated_changes",
+ body: multipart_data,
+ headers: ["Content-Type": "multipart/related;boundary=\"abc123\""]
+ )
+
+ assert resp.status_code in [201, 202]
+ assert resp.body["ok"] == true
+
+ rev = resp.body["rev"]
+
+ resp = Couch.get("/#{db_name}/multipart_replicated_changes/foo.txt")
+
+ assert resp.body == att_data
+
+ # https://github.com/apache/couchdb/issues/3939
+ # Repeating the request should not hang
+ Enum.each(0..10, fn _ ->
+ put_multipart_new_edits_false(db_name, rev, multipart_data)
+ end)
+ end
+
+ defp put_multipart_new_edits_false(db_name, rev, multipart_data) do
+ # Help ensure we're re-using client connections
+ ibrowse_opts = [{:max_sessions, 1}, {:max_pipeline_size, 1}]
+ resp =
+ Couch.put(
+ "/#{db_name}/multipart_replicated_changes?new_edits=false&rev=#{rev}",
+ body: multipart_data,
+ headers: ["Content-Type": "multipart/related;boundary=\"abc123\""],
+ ibrowse: ibrowse_opts
+ )
+
+ assert resp.status_code in [201, 202]
+ assert resp.body["ok"] == true
+ end
+
defp test_multipart_att_compression(dbname) do
doc = %{
"_id" => "foobar"
diff --git a/test/elixir/test/config/suite.elixir b/test/elixir/test/config/suite.elixir
index 2e97553ee..e071da87f 100644
--- a/test/elixir/test/config/suite.elixir
+++ b/test/elixir/test/config/suite.elixir
@@ -10,7 +10,8 @@
],
"AttachmentMultipartTest": [
"manages attachments multipart requests successfully",
- "manages compressed attachments successfully"
+ "manages compressed attachments successfully",
+ "multipart attachments with new_edits=false"
],
"AttachmentNamesTest": [
"saves attachment names successfully"