diff options
author | AlexanderKaraberov <3254818+AlexanderKaraberov@users.noreply.github.com> | 2018-11-12 17:20:04 +0100 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2018-11-14 14:03:07 -0500 |
commit | ff271a95b1611e55d1b071d40fa8d4b4c4caefbd (patch) | |
tree | 349ad373c70e5a9f59523ea9cbcbd60a314af1d1 | |
parent | caf021f9d81866e5f62f977d2e4bd9b5b676bcb5 (diff) | |
download | couchdb-ff271a95b1611e55d1b071d40fa8d4b4c4caefbd.tar.gz |
Add support for _bulk_get with multipart/mixed and multipart/realated responses
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 121 | ||||
-rw-r--r-- | src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl | 304 |
2 files changed, 412 insertions, 13 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index d46b5bbf2..9c78cf3fb 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -468,7 +468,8 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); -db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>]}=Req, Db) -> +db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>], + mochi_req=MochiReq}=Req, Db) -> couch_stats:increment_counter([couchdb, httpd, bulk_requests]), couch_httpd:validate_ctype(Req, "application/json"), {JsonProps} = chttpd:json_body_obj(Req), @@ -481,18 +482,62 @@ db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>]}=Req, Db) -> } = bulk_get_parse_doc_query(Req), Options = [{user_ctx, Req#httpd.user_ctx} | Options0], - {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, <<"{\"results\": [">>), - - lists:foldl(fun(Doc, Sep) -> - {DocId, Results, Options1} = bulk_get_open_doc_revs(Db, Doc, - Options), - bulk_get_send_docs_json(Resp, DocId, Results, Options1, Sep), - <<",">> - end, <<"">>, Docs), - - send_chunk(Resp, <<"]}">>), - end_json_response(Resp) + AcceptJson = MochiReq:accepts_content_type("application/json"), + AcceptMixedMp = MochiReq:accepts_content_type("multipart/mixed"), + AcceptRelatedMp = MochiReq:accepts_content_type("multipart/related"), + AcceptMp = not AcceptJson andalso (AcceptMixedMp orelse AcceptRelatedMp), + case AcceptMp of + false -> + {ok, Resp} = start_json_response(Req, 200), + send_chunk(Resp, <<"{\"results\": [">>), + lists:foldl(fun(Doc, Sep) -> + {DocId, Results, Options1} = bulk_get_open_doc_revs(Db, Doc, + Options), + bulk_get_send_docs_json(Resp, DocId, Results, Options1, Sep), + <<",">> + end, <<"">>, Docs), + send_chunk(Resp, <<"]}">>), + end_json_response(Resp); + true -> + OuterBoundary = bulk_get_multipart_boundary(), + MpType = case AcceptMixedMp of + true -> + "multipart/mixed"; + _ -> + "multipart/related" + end, + CType = {"Content-Type", MpType ++ "; boundary=\"" ++ + ?b2l(OuterBoundary) ++ "\""}, + {ok, Resp} = start_chunked_response(Req, 200, [CType]), + lists:foldl(fun(Doc, _Pre) -> + case bulk_get_open_doc_revs(Db, Doc, Options) of + {_, {ok, []}, _Options1} -> + ok; + {_, {ok, Results}, Options1} -> + send_docs_multipart_bulk_get(Results, Options1, + OuterBoundary, Resp); + {DocId, {error, {RevId, Error, Reason}}, _Options1} -> + Json = ?JSON_ENCODE({[ + {<<"id">>, DocId}, + {<<"rev">>, RevId}, + {<<"error">>, Error}, + {<<"reason">>, Reason} + ]}), + couch_httpd:send_chunk(Resp,[ + <<"\r\n--", OuterBoundary/binary>>, + <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>, + Json + ]) + end + end, <<"">>, Docs), + case Docs of + [] -> + ok; + _ -> + couch_httpd:send_chunk(Resp, <<"\r\n", "--", OuterBoundary/binary, "--\r\n">>) + end, + couch_httpd:last_chunk(Resp) + end end; db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); @@ -962,6 +1007,39 @@ send_doc_efficiently(#httpd{mochi_req=MochiReq}=Req, #doc{atts=Atts}=Doc, Header send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options)) end. +send_docs_multipart_bulk_get(Results, Options0, OuterBoundary, Resp) -> + InnerBoundary = bulk_get_multipart_boundary(), + Options = [attachments, follows, att_encoding_info | Options0], + lists:foreach( + fun({ok, #doc{id=Id, revs=Revs, atts=Atts}=Doc}) -> + Refs = monitor_attachments(Doc#doc.atts), + try + JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)), + couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>), + case Atts of + [] -> + couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: application/json\r\n\r\n">>); + _ -> + lists:foreach(fun(Header) -> couch_httpd:send_chunk(Resp, Header) end, + bulk_get_multipart_headers(Revs, Id, InnerBoundary)) + end, + couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts, + fun(Data) -> couch_httpd:send_chunk(Resp, Data) + end, true) + after + demonitor_refs(Refs) + end; + ({{not_found, missing}, RevId}) -> + RevStr = couch_doc:rev_to_str(RevId), + Json = ?JSON_ENCODE({[{<<"rev">>, RevStr}, + {<<"error">>, <<"not_found">>}, + {<<"reason">>, <<"missing">>}]}), + couch_httpd:send_chunk(Resp, + [<<"\r\n--", OuterBoundary/binary>>, + <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>, + Json]) + end, Results). + send_docs_multipart(Req, Results, Options1) -> OuterBoundary = couch_uuids:random(), InnerBoundary = couch_uuids:random(), @@ -997,6 +1075,23 @@ send_docs_multipart(Req, Results, Options1) -> couch_httpd:send_chunk(Resp, <<"--">>), couch_httpd:last_chunk(Resp). +bulk_get_multipart_headers({0, []}, Id, Boundary) -> + [ + <<"\r\nX-Doc-Id: ", Id/binary>>, + <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">> + ]; +bulk_get_multipart_headers({Start, [FirstRevId|_]}, Id, Boundary) -> + RevStr = couch_doc:rev_to_str({Start, FirstRevId}), + [ + <<"\r\nX-Doc-Id: ", Id/binary>>, + <<"\r\nX-Rev-Id: ", RevStr/binary>>, + <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">> + ]. + +bulk_get_multipart_boundary() -> + Unique = couch_uuids:random(), + <<"--", Unique/binary>>. + receive_request_data(Req) -> receive_request_data(Req, chttpd:body_length(Req)). diff --git a/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl new file mode 100644 index 000000000..601f720a0 --- /dev/null +++ b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl @@ -0,0 +1,304 @@ +%% Licensed under the Apache License, Version 2.0 (the "License"); you may not +%% use this file except in compliance with the License. You may obtain a copy of +%% the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +%% License for the specific language governing permissions and limitations under +%% the License. + +-module(chttpd_db_bulk_get_multipart_test). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(TIMEOUT, 3000). + + +setup() -> + mock(config), + mock(chttpd), + mock(couch_epi), + mock(couch_httpd), + mock(couch_stats), + mock(fabric), + mock(mochireq), + Pid = spawn_accumulator(), + Pid. + + +teardown(Pid) -> + ok = stop_accumulator(Pid), + meck:unload(). + + +bulk_get_test_() -> + { + "/db/_bulk_get tests", + { + foreach, fun setup/0, fun teardown/1, + [ + fun should_require_docs_field/1, + fun should_not_accept_specific_query_params/1, + fun should_return_empty_results_on_no_docs/1, + fun should_get_doc_with_all_revs/1, + fun should_validate_doc_with_bad_id/1, + fun should_validate_doc_with_bad_rev/1, + fun should_validate_missing_doc/1, + fun should_validate_bad_atts_since/1, + fun should_include_attachments_when_atts_since_specified/1 + ] + } + }. + + +should_require_docs_field(_) -> + Req = fake_request({[{}]}), + ?_assertThrow({bad_request, _}, chttpd_db:db_req(Req, nil)). + + +should_not_accept_specific_query_params(_) -> + Req = fake_request({[{<<"docs">>, []}]}), + lists:map(fun (Param) -> + {Param, ?_assertThrow({bad_request, _}, + begin + ok = meck:expect(chttpd, qs, + fun(_) -> [{Param, ""}] end), + chttpd_db:db_req(Req, nil) + end)} + end, ["rev", "open_revs", "atts_since", "w", "new_edits"]). + + +should_return_empty_results_on_no_docs(Pid) -> + Req = fake_request({[{<<"docs">>, []}]}), + chttpd_db:db_req(Req, nil), + Results = get_results_from_response(Pid), + ?_assertEqual([], Results). + + +should_get_doc_with_all_revs(Pid) -> + DocId = <<"docudoc">>, + Req = fake_request(DocId), + + DocRevA = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-ABC">>}]}}, + DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}}, + + mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}), + chttpd_db:db_req(Req, nil), + + Result = get_results_from_response(Pid), + ?_assertEqual(DocId, couch_util:get_value(<<"_id">>, Result)). + + +should_validate_doc_with_bad_id(Pid) -> + DocId = <<"_docudoc">>, + + Req = fake_request(DocId), + chttpd_db:db_req(Req, nil), + + Result = get_results_from_response(Pid), + ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), + + ?_assertMatch([{<<"id">>, DocId}, + {<<"rev">>, null}, + {<<"error">>, <<"illegal_docid">>}, + {<<"reason">>, _}], Result). + + +should_validate_doc_with_bad_rev(Pid) -> + DocId = <<"docudoc">>, + Rev = <<"revorev">>, + + Req = fake_request(DocId, Rev), + chttpd_db:db_req(Req, nil), + + Result = get_results_from_response(Pid), + ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), + + ?_assertMatch([{<<"id">>, DocId}, + {<<"rev">>, Rev}, + {<<"error">>, <<"bad_request">>}, + {<<"reason">>, _}], Result). + + +should_validate_missing_doc(Pid) -> + DocId = <<"docudoc">>, + Rev = <<"1-revorev">>, + + Req = fake_request(DocId, Rev), + mock_open_revs([{1,<<"revorev">>}], {ok, []}), + chttpd_db:db_req(Req, nil), + + Result = get_results_from_response(Pid), + ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), + + ?_assertMatch([{<<"id">>, DocId}, + {<<"rev">>, Rev}, + {<<"error">>, <<"not_found">>}, + {<<"reason">>, _}], Result). + + +should_validate_bad_atts_since(Pid) -> + DocId = <<"docudoc">>, + Rev = <<"1-revorev">>, + + Req = fake_request(DocId, Rev, <<"badattsince">>), + mock_open_revs([{1,<<"revorev">>}], {ok, []}), + chttpd_db:db_req(Req, nil), + + Result = get_results_from_response(Pid), + ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), + + ?_assertMatch([{<<"id">>, DocId}, + {<<"rev">>, <<"badattsince">>}, + {<<"error">>, <<"bad_request">>}, + {<<"reason">>, _}], Result). + + +should_include_attachments_when_atts_since_specified(_) -> + DocId = <<"docudoc">>, + Rev = <<"1-revorev">>, + + Req = fake_request(DocId, Rev, [<<"1-abc">>]), + mock_open_revs([{1,<<"revorev">>}], {ok, []}), + chttpd_db:db_req(Req, nil), + + ?_assert(meck:called(fabric, open_revs, + [nil, DocId, [{1, <<"revorev">>}], + [{atts_since, [{1, <<"abc">>}]}, attachments, + {user_ctx, undefined}]])). + +%% helpers + +fake_request(Payload) when is_tuple(Payload) -> + #httpd{method='POST', path_parts=[<<"db">>, <<"_bulk_get">>], + mochi_req=mochireq, req_body=Payload}; +fake_request(DocId) when is_binary(DocId) -> + fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}]}]}]}). + +fake_request(DocId, Rev) -> + fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}, {<<"rev">>, Rev}]}]}]}). + +fake_request(DocId, Rev, AttsSince) -> + fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}, + {<<"rev">>, Rev}, + {<<"atts_since">>, AttsSince}]}]}]}). + + +mock_open_revs(RevsReq0, RevsResp) -> + ok = meck:expect(fabric, open_revs, + fun(_, _, RevsReq1, _) -> + ?assertEqual(RevsReq0, RevsReq1), + RevsResp + end). + + +mock(mochireq) -> + ok = meck:new(mochireq, [non_strict]), + ok = meck:expect(mochireq, parse_qs, fun() -> [] end), + ok = meck:expect(mochireq, accepts_content_type, fun("multipart/mixed") -> true; + ("multipart/related") -> true; + (_) -> false end), + ok; +mock(couch_httpd) -> + ok = meck:new(couch_httpd, [passthrough]), + ok = meck:expect(couch_httpd, validate_ctype, fun(_, _) -> ok end), + ok = meck:expect(couch_httpd, last_chunk, fun(_) -> {ok, nil} end), + ok = meck:expect(couch_httpd, send_chunk, fun send_chunk/2), + ok; +mock(chttpd) -> + ok = meck:new(chttpd, [passthrough]), + ok = meck:expect(chttpd, start_json_response, fun(_, _) -> {ok, nil} end), + ok = meck:expect(chttpd, start_chunked_response, fun(_, _, _) -> {ok, nil} end), + ok = meck:expect(chttpd, end_json_response, fun(_) -> ok end), + ok = meck:expect(chttpd, send_chunk, fun send_chunk/2), + ok = meck:expect(chttpd, json_body_obj, fun (#httpd{req_body=Body}) -> Body end), + ok; +mock(couch_epi) -> + ok = meck:new(couch_epi, [passthrough]), + ok = meck:expect(couch_epi, any, fun(_, _, _, _, _) -> false end), + ok; +mock(couch_stats) -> + ok = meck:new(couch_stats, [passthrough]), + ok = meck:expect(couch_stats, increment_counter, fun(_) -> ok end), + ok = meck:expect(couch_stats, increment_counter, fun(_, _) -> ok end), + ok = meck:expect(couch_stats, decrement_counter, fun(_) -> ok end), + ok = meck:expect(couch_stats, decrement_counter, fun(_, _) -> ok end), + ok = meck:expect(couch_stats, update_histogram, fun(_, _) -> ok end), + ok = meck:expect(couch_stats, update_gauge, fun(_, _) -> ok end), + ok; +mock(fabric) -> + ok = meck:new(fabric, [passthrough]), + ok; +mock(config) -> + ok = meck:new(config, [passthrough]), + ok = meck:expect(config, get, fun(_, _, Default) -> Default end), + ok. + + +spawn_accumulator() -> + Parent = self(), + Pid = spawn(fun() -> accumulator_loop(Parent, []) end), + erlang:put(chunks_gather, Pid), + Pid. + +accumulator_loop(Parent, Acc) -> + receive + {stop, Ref} -> + Parent ! {ok, Ref}; + {get, Ref} -> + Parent ! {ok, Ref, Acc}, + accumulator_loop(Parent, Acc); + {put, Ref, Chunk} -> + Parent ! {ok, Ref}, + accumulator_loop(Parent, [Chunk|Acc]) + end. + +stop_accumulator(Pid) -> + Ref = make_ref(), + Pid ! {stop, Ref}, + receive + {ok, Ref} -> + ok + after ?TIMEOUT -> + throw({timeout, <<"process stop timeout">>}) + end. + + +send_chunk(_, []) -> + {ok, nil}; +send_chunk(_Req, [H|T]=Chunk) when is_list(Chunk) -> + send_chunk(_Req, H), + send_chunk(_Req, T); +send_chunk(_, Chunk) -> + Worker = erlang:get(chunks_gather), + Ref = make_ref(), + Worker ! {put, Ref, Chunk}, + receive + {ok, Ref} -> {ok, nil} + after ?TIMEOUT -> + throw({timeout, <<"send chunk timeout">>}) + end. + + +get_response(Pid) -> + Ref = make_ref(), + Pid ! {get, Ref}, + receive + {ok, Ref, Acc} -> + Acc + after ?TIMEOUT -> + throw({timeout, <<"get response timeout">>}) + end. + +get_results_from_response(Pid) -> + case get_response(Pid) of + [] -> + []; + Result -> + {Result1} = ?JSON_DECODE(lists:nth(2, Result)), + Result1 + end. |