summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexanderKaraberov <3254818+AlexanderKaraberov@users.noreply.github.com>2018-11-12 17:20:04 +0100
committerNick Vatamaniuc <nickva@users.noreply.github.com>2018-11-14 14:03:07 -0500
commitff271a95b1611e55d1b071d40fa8d4b4c4caefbd (patch)
tree349ad373c70e5a9f59523ea9cbcbd60a314af1d1
parentcaf021f9d81866e5f62f977d2e4bd9b5b676bcb5 (diff)
downloadcouchdb-ff271a95b1611e55d1b071d40fa8d4b4c4caefbd.tar.gz
Add support for _bulk_get with multipart/mixed and multipart/realated responses
-rw-r--r--src/chttpd/src/chttpd_db.erl121
-rw-r--r--src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl304
2 files changed, 412 insertions, 13 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index d46b5bbf2..9c78cf3fb 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -468,7 +468,8 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
-db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>]}=Req, Db) ->
+db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>],
+ mochi_req=MochiReq}=Req, Db) ->
couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
couch_httpd:validate_ctype(Req, "application/json"),
{JsonProps} = chttpd:json_body_obj(Req),
@@ -481,18 +482,62 @@ db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>]}=Req, Db) ->
} = bulk_get_parse_doc_query(Req),
Options = [{user_ctx, Req#httpd.user_ctx} | Options0],
- {ok, Resp} = start_json_response(Req, 200),
- send_chunk(Resp, <<"{\"results\": [">>),
-
- lists:foldl(fun(Doc, Sep) ->
- {DocId, Results, Options1} = bulk_get_open_doc_revs(Db, Doc,
- Options),
- bulk_get_send_docs_json(Resp, DocId, Results, Options1, Sep),
- <<",">>
- end, <<"">>, Docs),
-
- send_chunk(Resp, <<"]}">>),
- end_json_response(Resp)
+ AcceptJson = MochiReq:accepts_content_type("application/json"),
+ AcceptMixedMp = MochiReq:accepts_content_type("multipart/mixed"),
+ AcceptRelatedMp = MochiReq:accepts_content_type("multipart/related"),
+ AcceptMp = not AcceptJson andalso (AcceptMixedMp orelse AcceptRelatedMp),
+ case AcceptMp of
+ false ->
+ {ok, Resp} = start_json_response(Req, 200),
+ send_chunk(Resp, <<"{\"results\": [">>),
+ lists:foldl(fun(Doc, Sep) ->
+ {DocId, Results, Options1} = bulk_get_open_doc_revs(Db, Doc,
+ Options),
+ bulk_get_send_docs_json(Resp, DocId, Results, Options1, Sep),
+ <<",">>
+ end, <<"">>, Docs),
+ send_chunk(Resp, <<"]}">>),
+ end_json_response(Resp);
+ true ->
+ OuterBoundary = bulk_get_multipart_boundary(),
+ MpType = case AcceptMixedMp of
+ true ->
+ "multipart/mixed";
+ _ ->
+ "multipart/related"
+ end,
+ CType = {"Content-Type", MpType ++ "; boundary=\"" ++
+ ?b2l(OuterBoundary) ++ "\""},
+ {ok, Resp} = start_chunked_response(Req, 200, [CType]),
+ lists:foldl(fun(Doc, _Pre) ->
+ case bulk_get_open_doc_revs(Db, Doc, Options) of
+ {_, {ok, []}, _Options1} ->
+ ok;
+ {_, {ok, Results}, Options1} ->
+ send_docs_multipart_bulk_get(Results, Options1,
+ OuterBoundary, Resp);
+ {DocId, {error, {RevId, Error, Reason}}, _Options1} ->
+ Json = ?JSON_ENCODE({[
+ {<<"id">>, DocId},
+ {<<"rev">>, RevId},
+ {<<"error">>, Error},
+ {<<"reason">>, Reason}
+ ]}),
+ couch_httpd:send_chunk(Resp,[
+ <<"\r\n--", OuterBoundary/binary>>,
+ <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
+ Json
+ ])
+ end
+ end, <<"">>, Docs),
+ case Docs of
+ [] ->
+ ok;
+ _ ->
+ couch_httpd:send_chunk(Resp, <<"\r\n", "--", OuterBoundary/binary, "--\r\n">>)
+ end,
+ couch_httpd:last_chunk(Resp)
+ end
end;
db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) ->
send_method_not_allowed(Req, "POST");
@@ -962,6 +1007,39 @@ send_doc_efficiently(#httpd{mochi_req=MochiReq}=Req, #doc{atts=Atts}=Doc, Header
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
end.
+send_docs_multipart_bulk_get(Results, Options0, OuterBoundary, Resp) ->
+ InnerBoundary = bulk_get_multipart_boundary(),
+ Options = [attachments, follows, att_encoding_info | Options0],
+ lists:foreach(
+ fun({ok, #doc{id=Id, revs=Revs, atts=Atts}=Doc}) ->
+ Refs = monitor_attachments(Doc#doc.atts),
+ try
+ JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
+ couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>),
+ case Atts of
+ [] ->
+ couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: application/json\r\n\r\n">>);
+ _ ->
+ lists:foreach(fun(Header) -> couch_httpd:send_chunk(Resp, Header) end,
+ bulk_get_multipart_headers(Revs, Id, InnerBoundary))
+ end,
+ couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
+ fun(Data) -> couch_httpd:send_chunk(Resp, Data)
+ end, true)
+ after
+ demonitor_refs(Refs)
+ end;
+ ({{not_found, missing}, RevId}) ->
+ RevStr = couch_doc:rev_to_str(RevId),
+ Json = ?JSON_ENCODE({[{<<"rev">>, RevStr},
+ {<<"error">>, <<"not_found">>},
+ {<<"reason">>, <<"missing">>}]}),
+ couch_httpd:send_chunk(Resp,
+ [<<"\r\n--", OuterBoundary/binary>>,
+ <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
+ Json])
+ end, Results).
+
send_docs_multipart(Req, Results, Options1) ->
OuterBoundary = couch_uuids:random(),
InnerBoundary = couch_uuids:random(),
@@ -997,6 +1075,23 @@ send_docs_multipart(Req, Results, Options1) ->
couch_httpd:send_chunk(Resp, <<"--">>),
couch_httpd:last_chunk(Resp).
+bulk_get_multipart_headers({0, []}, Id, Boundary) ->
+ [
+ <<"\r\nX-Doc-Id: ", Id/binary>>,
+ <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
+ ];
+bulk_get_multipart_headers({Start, [FirstRevId|_]}, Id, Boundary) ->
+ RevStr = couch_doc:rev_to_str({Start, FirstRevId}),
+ [
+ <<"\r\nX-Doc-Id: ", Id/binary>>,
+ <<"\r\nX-Rev-Id: ", RevStr/binary>>,
+ <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
+ ].
+
+bulk_get_multipart_boundary() ->
+ Unique = couch_uuids:random(),
+ <<"--", Unique/binary>>.
+
receive_request_data(Req) ->
receive_request_data(Req, chttpd:body_length(Req)).
diff --git a/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl
new file mode 100644
index 000000000..601f720a0
--- /dev/null
+++ b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl
@@ -0,0 +1,304 @@
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
+
+-module(chttpd_db_bulk_get_multipart_test).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TIMEOUT, 3000).
+
+
+setup() ->
+ mock(config),
+ mock(chttpd),
+ mock(couch_epi),
+ mock(couch_httpd),
+ mock(couch_stats),
+ mock(fabric),
+ mock(mochireq),
+ Pid = spawn_accumulator(),
+ Pid.
+
+
+teardown(Pid) ->
+ ok = stop_accumulator(Pid),
+ meck:unload().
+
+
+bulk_get_test_() ->
+ {
+ "/db/_bulk_get tests",
+ {
+ foreach, fun setup/0, fun teardown/1,
+ [
+ fun should_require_docs_field/1,
+ fun should_not_accept_specific_query_params/1,
+ fun should_return_empty_results_on_no_docs/1,
+ fun should_get_doc_with_all_revs/1,
+ fun should_validate_doc_with_bad_id/1,
+ fun should_validate_doc_with_bad_rev/1,
+ fun should_validate_missing_doc/1,
+ fun should_validate_bad_atts_since/1,
+ fun should_include_attachments_when_atts_since_specified/1
+ ]
+ }
+ }.
+
+
+should_require_docs_field(_) ->
+ Req = fake_request({[{}]}),
+ ?_assertThrow({bad_request, _}, chttpd_db:db_req(Req, nil)).
+
+
+should_not_accept_specific_query_params(_) ->
+ Req = fake_request({[{<<"docs">>, []}]}),
+ lists:map(fun (Param) ->
+ {Param, ?_assertThrow({bad_request, _},
+ begin
+ ok = meck:expect(chttpd, qs,
+ fun(_) -> [{Param, ""}] end),
+ chttpd_db:db_req(Req, nil)
+ end)}
+ end, ["rev", "open_revs", "atts_since", "w", "new_edits"]).
+
+
+should_return_empty_results_on_no_docs(Pid) ->
+ Req = fake_request({[{<<"docs">>, []}]}),
+ chttpd_db:db_req(Req, nil),
+ Results = get_results_from_response(Pid),
+ ?_assertEqual([], Results).
+
+
+should_get_doc_with_all_revs(Pid) ->
+ DocId = <<"docudoc">>,
+ Req = fake_request(DocId),
+
+ DocRevA = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-ABC">>}]}},
+ DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}},
+
+ mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}),
+ chttpd_db:db_req(Req, nil),
+
+ Result = get_results_from_response(Pid),
+ ?_assertEqual(DocId, couch_util:get_value(<<"_id">>, Result)).
+
+
+should_validate_doc_with_bad_id(Pid) ->
+ DocId = <<"_docudoc">>,
+
+ Req = fake_request(DocId),
+ chttpd_db:db_req(Req, nil),
+
+ Result = get_results_from_response(Pid),
+ ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+
+ ?_assertMatch([{<<"id">>, DocId},
+ {<<"rev">>, null},
+ {<<"error">>, <<"illegal_docid">>},
+ {<<"reason">>, _}], Result).
+
+
+should_validate_doc_with_bad_rev(Pid) ->
+ DocId = <<"docudoc">>,
+ Rev = <<"revorev">>,
+
+ Req = fake_request(DocId, Rev),
+ chttpd_db:db_req(Req, nil),
+
+ Result = get_results_from_response(Pid),
+ ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+
+ ?_assertMatch([{<<"id">>, DocId},
+ {<<"rev">>, Rev},
+ {<<"error">>, <<"bad_request">>},
+ {<<"reason">>, _}], Result).
+
+
+should_validate_missing_doc(Pid) ->
+ DocId = <<"docudoc">>,
+ Rev = <<"1-revorev">>,
+
+ Req = fake_request(DocId, Rev),
+ mock_open_revs([{1,<<"revorev">>}], {ok, []}),
+ chttpd_db:db_req(Req, nil),
+
+ Result = get_results_from_response(Pid),
+ ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+
+ ?_assertMatch([{<<"id">>, DocId},
+ {<<"rev">>, Rev},
+ {<<"error">>, <<"not_found">>},
+ {<<"reason">>, _}], Result).
+
+
+should_validate_bad_atts_since(Pid) ->
+ DocId = <<"docudoc">>,
+ Rev = <<"1-revorev">>,
+
+ Req = fake_request(DocId, Rev, <<"badattsince">>),
+ mock_open_revs([{1,<<"revorev">>}], {ok, []}),
+ chttpd_db:db_req(Req, nil),
+
+ Result = get_results_from_response(Pid),
+ ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+
+ ?_assertMatch([{<<"id">>, DocId},
+ {<<"rev">>, <<"badattsince">>},
+ {<<"error">>, <<"bad_request">>},
+ {<<"reason">>, _}], Result).
+
+
+should_include_attachments_when_atts_since_specified(_) ->
+ DocId = <<"docudoc">>,
+ Rev = <<"1-revorev">>,
+
+ Req = fake_request(DocId, Rev, [<<"1-abc">>]),
+ mock_open_revs([{1,<<"revorev">>}], {ok, []}),
+ chttpd_db:db_req(Req, nil),
+
+ ?_assert(meck:called(fabric, open_revs,
+ [nil, DocId, [{1, <<"revorev">>}],
+ [{atts_since, [{1, <<"abc">>}]}, attachments,
+ {user_ctx, undefined}]])).
+
+%% helpers
+
+fake_request(Payload) when is_tuple(Payload) ->
+ #httpd{method='POST', path_parts=[<<"db">>, <<"_bulk_get">>],
+ mochi_req=mochireq, req_body=Payload};
+fake_request(DocId) when is_binary(DocId) ->
+ fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}]}]}]}).
+
+fake_request(DocId, Rev) ->
+ fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}, {<<"rev">>, Rev}]}]}]}).
+
+fake_request(DocId, Rev, AttsSince) ->
+ fake_request({[{<<"docs">>, [{[{<<"id">>, DocId},
+ {<<"rev">>, Rev},
+ {<<"atts_since">>, AttsSince}]}]}]}).
+
+
+mock_open_revs(RevsReq0, RevsResp) ->
+ ok = meck:expect(fabric, open_revs,
+ fun(_, _, RevsReq1, _) ->
+ ?assertEqual(RevsReq0, RevsReq1),
+ RevsResp
+ end).
+
+
+mock(mochireq) ->
+ ok = meck:new(mochireq, [non_strict]),
+ ok = meck:expect(mochireq, parse_qs, fun() -> [] end),
+ ok = meck:expect(mochireq, accepts_content_type, fun("multipart/mixed") -> true;
+ ("multipart/related") -> true;
+ (_) -> false end),
+ ok;
+mock(couch_httpd) ->
+ ok = meck:new(couch_httpd, [passthrough]),
+ ok = meck:expect(couch_httpd, validate_ctype, fun(_, _) -> ok end),
+ ok = meck:expect(couch_httpd, last_chunk, fun(_) -> {ok, nil} end),
+ ok = meck:expect(couch_httpd, send_chunk, fun send_chunk/2),
+ ok;
+mock(chttpd) ->
+ ok = meck:new(chttpd, [passthrough]),
+ ok = meck:expect(chttpd, start_json_response, fun(_, _) -> {ok, nil} end),
+ ok = meck:expect(chttpd, start_chunked_response, fun(_, _, _) -> {ok, nil} end),
+ ok = meck:expect(chttpd, end_json_response, fun(_) -> ok end),
+ ok = meck:expect(chttpd, send_chunk, fun send_chunk/2),
+ ok = meck:expect(chttpd, json_body_obj, fun (#httpd{req_body=Body}) -> Body end),
+ ok;
+mock(couch_epi) ->
+ ok = meck:new(couch_epi, [passthrough]),
+ ok = meck:expect(couch_epi, any, fun(_, _, _, _, _) -> false end),
+ ok;
+mock(couch_stats) ->
+ ok = meck:new(couch_stats, [passthrough]),
+ ok = meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
+ ok = meck:expect(couch_stats, increment_counter, fun(_, _) -> ok end),
+ ok = meck:expect(couch_stats, decrement_counter, fun(_) -> ok end),
+ ok = meck:expect(couch_stats, decrement_counter, fun(_, _) -> ok end),
+ ok = meck:expect(couch_stats, update_histogram, fun(_, _) -> ok end),
+ ok = meck:expect(couch_stats, update_gauge, fun(_, _) -> ok end),
+ ok;
+mock(fabric) ->
+ ok = meck:new(fabric, [passthrough]),
+ ok;
+mock(config) ->
+ ok = meck:new(config, [passthrough]),
+ ok = meck:expect(config, get, fun(_, _, Default) -> Default end),
+ ok.
+
+
+spawn_accumulator() ->
+ Parent = self(),
+ Pid = spawn(fun() -> accumulator_loop(Parent, []) end),
+ erlang:put(chunks_gather, Pid),
+ Pid.
+
+accumulator_loop(Parent, Acc) ->
+ receive
+ {stop, Ref} ->
+ Parent ! {ok, Ref};
+ {get, Ref} ->
+ Parent ! {ok, Ref, Acc},
+ accumulator_loop(Parent, Acc);
+ {put, Ref, Chunk} ->
+ Parent ! {ok, Ref},
+ accumulator_loop(Parent, [Chunk|Acc])
+ end.
+
+stop_accumulator(Pid) ->
+ Ref = make_ref(),
+ Pid ! {stop, Ref},
+ receive
+ {ok, Ref} ->
+ ok
+ after ?TIMEOUT ->
+ throw({timeout, <<"process stop timeout">>})
+ end.
+
+
+send_chunk(_, []) ->
+ {ok, nil};
+send_chunk(_Req, [H|T]=Chunk) when is_list(Chunk) ->
+ send_chunk(_Req, H),
+ send_chunk(_Req, T);
+send_chunk(_, Chunk) ->
+ Worker = erlang:get(chunks_gather),
+ Ref = make_ref(),
+ Worker ! {put, Ref, Chunk},
+ receive
+ {ok, Ref} -> {ok, nil}
+ after ?TIMEOUT ->
+ throw({timeout, <<"send chunk timeout">>})
+ end.
+
+
+get_response(Pid) ->
+ Ref = make_ref(),
+ Pid ! {get, Ref},
+ receive
+ {ok, Ref, Acc} ->
+ Acc
+ after ?TIMEOUT ->
+ throw({timeout, <<"get response timeout">>})
+ end.
+
+get_results_from_response(Pid) ->
+ case get_response(Pid) of
+ [] ->
+ [];
+ Result ->
+ {Result1} = ?JSON_DECODE(lists:nth(2, Result)),
+ Result1
+ end.