diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-10-25 14:19:07 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-01-18 13:03:28 -0600 |
commit | 3a1dd0a7e018c51d64c6b43ecde958b003e02883 (patch) | |
tree | e0956937204622715f4ac79165fcef171be6d8ad | |
parent | 6edb9a54b7111fdb0a1713d1af2268271e27d02f (diff) | |
download | couchdb-3a1dd0a7e018c51d64c6b43ecde958b003e02883.tar.gz |
Implement partitioned views
The benefit of using partitioned databases is that views can then be
scoped to a single shard range. This allows for views to scale nearly as
linearly as document lookups.
Co-authored-by: Garren Smith <garren.smith@gmail.com>
Co-authored-by: Robert Newson <rnewson@apache.org>
21 files changed, 384 insertions, 61 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index c4f3686fb..bcd082448 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -22,7 +22,8 @@ db_req/2, couch_doc_open/4,handle_changes_req/2, update_doc_result_to_json/1, update_doc_result_to_json/2, handle_design_info_req/3, handle_view_cleanup_req/2, - update_doc/4, http_code_from_status/1]). + update_doc/4, http_code_from_status/1, + handle_partition_req/2]). -import(chttpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, @@ -55,6 +56,11 @@ orelse T == <<"_local_docs">> orelse T == <<"_design_docs">>)). +-define(IS_MANGO(T), ( + T == <<"_index">> + orelse T == <<"_find">> + orelse T == <<"_explain">>)). + % Database request handlers handle_request(#httpd{path_parts=[DbName|RestParts],method=Method}=Req)-> case {Method, RestParts} of @@ -254,6 +260,51 @@ handle_view_cleanup_req(Req, Db) -> ok = fabric:cleanup_index_files_all_nodes(Db), send_json(Req, 202, {[{ok, true}]}). + +handle_partition_req(#httpd{method='GET', path_parts=[_,_,PartId]}=Req, Db) -> + couch_partition:validate_partition(PartId), + case couch_db:is_partitioned(Db) of + true -> + {ok, PartitionInfo} = fabric:get_partition_info(Db, PartId), + send_json(Req, {PartitionInfo}); + false -> + throw({bad_request, <<"database is not partitioned">>}) + end; + +handle_partition_req(#httpd{path_parts = [_, _, _]}=Req, _Db) -> + send_method_not_allowed(Req, "GET"); + +handle_partition_req(#httpd{path_parts=[DbName, _, PartId | Rest]}=Req, Db) -> + case couch_db:is_partitioned(Db) of + true -> + couch_partition:validate_partition(PartId), + QS = chttpd:qs(Req), + NewQS = lists:ukeysort(1, [{"partition", ?b2l(PartId)} | QS]), + NewReq = Req#httpd{ + path_parts = [DbName | Rest], + qs = NewQS + }, + case Rest of + [OP | _] when OP == <<"_all_docs">> orelse ?IS_MANGO(OP) -> + case chttpd_handlers:db_handler(OP, fun db_req/2) of + Handler when is_function(Handler, 2) -> + Handler(NewReq, Db); + _ -> + chttpd:send_error(Req, not_found) + end; + [<<"_design">>, _Name, <<"_", _/binary>> | _] -> + handle_design_req(NewReq, Db); + _ -> + chttpd:send_error(Req, not_found) + end; + false -> + throw({bad_request, <<"database is not partitioned">>}) + end; + +handle_partition_req(Req, _Db) -> + chttpd:send_error(Req, not_found). + + handle_design_req(#httpd{ path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest] }=Req, Db) -> @@ -752,7 +803,7 @@ multi_all_docs_view(Req, Db, OP, Queries) -> ArgQueries = lists:map(fun({Query}) -> QueryArg1 = couch_mrview_http:parse_params(Query, undefined, Args1, [decoded]), - QueryArgs2 = couch_mrview_util:validate_args(QueryArg1), + QueryArgs2 = fabric_util:validate_all_docs_args(Db, QueryArg1), set_namespace(OP, QueryArgs2) end, Queries), Options = [{user_ctx, Req#httpd.user_ctx}], @@ -772,7 +823,7 @@ multi_all_docs_view(Req, Db, OP, Queries) -> all_docs_view(Req, Db, Keys, OP) -> Args0 = couch_mrview_http:parse_params(Req, Keys), Args1 = Args0#mrargs{view_type=map}, - Args2 = couch_mrview_util:validate_args(Args1), + Args2 = fabric_util:validate_all_docs_args(Db, Args1), Args3 = set_namespace(OP, Args2), Options = [{user_ctx, Req#httpd.user_ctx}], Max = chttpd:chunked_response_buffer_size(), @@ -1769,8 +1820,8 @@ set_namespace(<<"_local_docs">>, Args) -> set_namespace(<<"_local">>, Args); set_namespace(<<"_design_docs">>, Args) -> set_namespace(<<"_design">>, Args); -set_namespace(NS, #mrargs{extra = Extra} = Args) -> - Args#mrargs{extra = [{namespace, NS} | Extra]}. +set_namespace(NS, #mrargs{} = Args) -> + couch_mrview_util:set_extra(Args, namespace, NS). %% /db/_bulk_get stuff diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl index cb52e2c40..000f29b2f 100644 --- a/src/chttpd/src/chttpd_httpd_handlers.erl +++ b/src/chttpd/src/chttpd_httpd_handlers.erl @@ -32,6 +32,7 @@ url_handler(_) -> no_match. db_handler(<<"_view_cleanup">>) -> fun chttpd_db:handle_view_cleanup_req/2; db_handler(<<"_compact">>) -> fun chttpd_db:handle_compact_req/2; db_handler(<<"_design">>) -> fun chttpd_db:handle_design_req/2; +db_handler(<<"_partition">>) -> fun chttpd_db:handle_partition_req/2; db_handler(<<"_temp_view">>) -> fun chttpd_view:handle_temp_view_req/2; db_handler(<<"_changes">>) -> fun chttpd_db:handle_changes_req/2; db_handler(_) -> no_match. diff --git a/src/chttpd/src/chttpd_view.erl b/src/chttpd/src/chttpd_view.erl index 3c05c64ca..1fce165f9 100644 --- a/src/chttpd/src/chttpd_view.erl +++ b/src/chttpd/src/chttpd_view.erl @@ -24,7 +24,7 @@ multi_query_view(Req, Db, DDoc, ViewName, Queries) -> QueryArg = couch_mrview_http:parse_params(Query, undefined, Args1, [decoded]), QueryArg1 = couch_mrview_util:set_view_type(QueryArg, ViewName, Views), - couch_mrview_util:validate_args(QueryArg1) + fabric_util:validate_args(Db, DDoc, QueryArg1) end, Queries), Options = [{user_ctx, Req#httpd.user_ctx}], VAcc0 = #vacc{db=Db, req=Req, prepend="\r\n"}, @@ -122,17 +122,19 @@ check_multi_query_reduce_view_overrides_test_() -> t_check_include_docs_throw_validation_error() -> ?_test(begin Req = #httpd{qs = []}, + Db = test_util:fake_db([{name, <<"foo">>}]), Query = {[{<<"include_docs">>, true}]}, Throw = {query_parse_error, <<"`include_docs` is invalid for reduce">>}, - ?assertThrow(Throw, multi_query_view(Req, db, ddoc, <<"v">>, [Query])) + ?assertThrow(Throw, multi_query_view(Req, Db, ddoc, <<"v">>, [Query])) end). t_check_user_can_override_individual_query_type() -> ?_test(begin Req = #httpd{qs = []}, + Db = test_util:fake_db([{name, <<"foo">>}]), Query = {[{<<"include_docs">>, true}, {<<"reduce">>, false}]}, - multi_query_view(Req, db, ddoc, <<"v">>, [Query]), + multi_query_view(Req, Db, ddoc, <<"v">>, [Query]), ?assertEqual(1, meck:num_calls(chttpd, start_delayed_json_response, '_')) end). diff --git a/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl index 601f720a0..8a95c92ac 100644 --- a/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl +++ b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl @@ -57,24 +57,27 @@ bulk_get_test_() -> should_require_docs_field(_) -> Req = fake_request({[{}]}), - ?_assertThrow({bad_request, _}, chttpd_db:db_req(Req, nil)). + Db = test_util:fake_db([{name, <<"foo">>}]), + ?_assertThrow({bad_request, _}, chttpd_db:db_req(Req, Db)). should_not_accept_specific_query_params(_) -> Req = fake_request({[{<<"docs">>, []}]}), + Db = test_util:fake_db([{name, <<"foo">>}]), lists:map(fun (Param) -> {Param, ?_assertThrow({bad_request, _}, begin ok = meck:expect(chttpd, qs, fun(_) -> [{Param, ""}] end), - chttpd_db:db_req(Req, nil) + chttpd_db:db_req(Req, Db) end)} end, ["rev", "open_revs", "atts_since", "w", "new_edits"]). should_return_empty_results_on_no_docs(Pid) -> Req = fake_request({[{<<"docs">>, []}]}), - chttpd_db:db_req(Req, nil), + Db = test_util:fake_db([{name, <<"foo">>}]), + chttpd_db:db_req(Req, Db), Results = get_results_from_response(Pid), ?_assertEqual([], Results). @@ -82,12 +85,13 @@ should_return_empty_results_on_no_docs(Pid) -> should_get_doc_with_all_revs(Pid) -> DocId = <<"docudoc">>, Req = fake_request(DocId), + Db = test_util:fake_db([{name, <<"foo">>}]), DocRevA = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-ABC">>}]}}, DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}}, mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, Db), Result = get_results_from_response(Pid), ?_assertEqual(DocId, couch_util:get_value(<<"_id">>, Result)). @@ -97,7 +101,8 @@ should_validate_doc_with_bad_id(Pid) -> DocId = <<"_docudoc">>, Req = fake_request(DocId), - chttpd_db:db_req(Req, nil), + Db = test_util:fake_db([{name, <<"foo">>}]), + chttpd_db:db_req(Req, Db), Result = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -113,7 +118,8 @@ should_validate_doc_with_bad_rev(Pid) -> Rev = <<"revorev">>, Req = fake_request(DocId, Rev), - chttpd_db:db_req(Req, nil), + Db = test_util:fake_db([{name, <<"foo">>}]), + chttpd_db:db_req(Req, Db), Result = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -129,8 +135,9 @@ should_validate_missing_doc(Pid) -> Rev = <<"1-revorev">>, Req = fake_request(DocId, Rev), + Db = test_util:fake_db([{name, <<"foo">>}]), mock_open_revs([{1,<<"revorev">>}], {ok, []}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, Db), Result = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -146,8 +153,9 @@ should_validate_bad_atts_since(Pid) -> Rev = <<"1-revorev">>, Req = fake_request(DocId, Rev, <<"badattsince">>), + Db = test_util:fake_db([{name, <<"foo">>}]), mock_open_revs([{1,<<"revorev">>}], {ok, []}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, Db), Result = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -163,11 +171,12 @@ should_include_attachments_when_atts_since_specified(_) -> Rev = <<"1-revorev">>, Req = fake_request(DocId, Rev, [<<"1-abc">>]), + Db = test_util:fake_db([{name, <<"foo">>}]), mock_open_revs([{1,<<"revorev">>}], {ok, []}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, Db), ?_assert(meck:called(fabric, open_revs, - [nil, DocId, [{1, <<"revorev">>}], + ['_', DocId, [{1, <<"revorev">>}], [{atts_since, [{1, <<"abc">>}]}, attachments, {user_ctx, undefined}]])). diff --git a/src/chttpd/test/chttpd_db_bulk_get_test.erl b/src/chttpd/test/chttpd_db_bulk_get_test.erl index 908d1f022..864e7079a 100644 --- a/src/chttpd/test/chttpd_db_bulk_get_test.erl +++ b/src/chttpd/test/chttpd_db_bulk_get_test.erl @@ -95,7 +95,7 @@ should_get_doc_with_all_revs(Pid) -> DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}}, mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, test_util:fake_db([{name, <<"foo">>}])), [{Result}] = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -115,7 +115,7 @@ should_validate_doc_with_bad_id(Pid) -> DocId = <<"_docudoc">>, Req = fake_request(DocId), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, test_util:fake_db([{name, <<"foo">>}])), [{Result}] = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -138,7 +138,7 @@ should_validate_doc_with_bad_rev(Pid) -> Rev = <<"revorev">>, Req = fake_request(DocId, Rev), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, test_util:fake_db([{name, <<"foo">>}])), [{Result}] = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -162,7 +162,7 @@ should_validate_missing_doc(Pid) -> Req = fake_request(DocId, Rev), mock_open_revs([{1,<<"revorev">>}], {ok, []}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, test_util:fake_db([{name, <<"foo">>}])), [{Result}] = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -186,7 +186,7 @@ should_validate_bad_atts_since(Pid) -> Req = fake_request(DocId, Rev, <<"badattsince">>), mock_open_revs([{1,<<"revorev">>}], {ok, []}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, test_util:fake_db([{name, <<"foo">>}])), [{Result}] = get_results_from_response(Pid), ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)), @@ -210,10 +210,10 @@ should_include_attachments_when_atts_since_specified(_) -> Req = fake_request(DocId, Rev, [<<"1-abc">>]), mock_open_revs([{1,<<"revorev">>}], {ok, []}), - chttpd_db:db_req(Req, nil), + chttpd_db:db_req(Req, test_util:fake_db([{name, <<"foo">>}])), ?_assert(meck:called(fabric, open_revs, - [nil, DocId, [{1, <<"revorev">>}], + ['_', DocId, [{1, <<"revorev">>}], [{atts_since, [{1, <<"abc">>}]}, attachments, {user_ctx, undefined}]])). diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index daf846ba8..ea0cf69e9 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -133,7 +133,9 @@ make_group_fun(Bt, exact) -> end; make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 -> fun - ({[_|_] = Key1, _}, {[_|_] = Key2, _}) -> + GF({{p, Partition, Key1}, Val1}, {{p, Partition, Key2}, Val2}) -> + GF({Key1, Val1}, {Key2, Val2}); + GF({[_|_] = Key1, _}, {[_|_] = Key2, _}) -> SL1 = lists:sublist(Key1, GroupLevel), SL2 = lists:sublist(Key2, GroupLevel), case less(Bt, {SL1, nil}, {SL2, nil}) of @@ -147,7 +149,7 @@ make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 -> _ -> false end; - ({Key1, _}, {Key2, _}) -> + GF({Key1, _}, {Key2, _}) -> case less(Bt, {Key1, nil}, {Key2, nil}) of false -> case less(Bt, {Key2, nil}, {Key1, nil}) of diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl index 81adbb8f5..ca36c8656 100644 --- a/src/couch/src/couch_ejson_compare.erl +++ b/src/couch/src/couch_ejson_compare.erl @@ -22,6 +22,10 @@ init() -> Dir = code:priv_dir(couch), ok = erlang:load_nif(filename:join(Dir, ?MODULE), NumScheds). +% partitioned row comparison +less({p, PA, A}, {p, PB, B}) -> + less([PA, A], [PB, B]); + less(A, B) -> try less_nif(A, B) diff --git a/src/couch/src/couch_partition.erl b/src/couch/src/couch_partition.erl index 9ff77a0ff..f2efcaa5e 100644 --- a/src/couch/src/couch_partition.erl +++ b/src/couch/src/couch_partition.erl @@ -20,6 +20,7 @@ start_key/1, end_key/1, + shard_key/1, validate_dbname/2, validate_docid/1, @@ -70,6 +71,10 @@ end_key(Partition) -> <<Partition/binary, ";">>. +shard_key(Partition) -> + <<Partition/binary, ":foo">>. + + validate_dbname(DbName, Options) when is_list(DbName) -> validate_dbname(?l2b(DbName), Options); validate_dbname(DbName, Options) when is_binary(DbName) -> diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl index a341e30db..e17aaba93 100644 --- a/src/couch_mrview/include/couch_mrview.hrl +++ b/src/couch_mrview/include/couch_mrview.hrl @@ -20,6 +20,7 @@ design_opts=[], seq_indexed=false, keyseq_indexed=false, + partitioned=false, lib, views, id_btree=nil, diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 391acf412..ae1d8d6f5 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -59,6 +59,7 @@ validate_ddoc_fields(DDoc) -> [{<<"options">>, object}], [{<<"options">>, object}, {<<"include_design">>, boolean}], [{<<"options">>, object}, {<<"local_seq">>, boolean}], + [{<<"options">>, object}, {<<"partitioned">>, boolean}], [{<<"rewrites">>, [string, array]}], [{<<"shows">>, object}, {any, [object, string]}], [{<<"updates">>, object}, {any, [object, string]}], @@ -200,9 +201,19 @@ validate(Db, DDoc) -> end, {ok, #mrst{ language = Lang, - views = Views + views = Views, + partitioned = Partitioned }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc), + case {couch_db:is_partitioned(Db), Partitioned} of + {false, true} -> + throw({invalid_design_doc, + <<"partitioned option cannot be true in a " + "non-partitioned database.">>}); + {_, _} -> + ok + end, + try Views =/= [] andalso couch_query_servers:get_os_process(Lang) of false -> ok; @@ -230,7 +241,7 @@ query_all_docs(Db, Args0, Callback, Acc) -> couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Info))) end), Args1 = Args0#mrargs{view_type=map}, - Args2 = couch_mrview_util:validate_args(Args1), + Args2 = couch_mrview_util:validate_all_docs_args(Db, Args1), {ok, Acc1} = case Args2#mrargs.preflight_fun of PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc); _ -> {ok, Acc} @@ -616,6 +627,8 @@ red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) -> end, Acc, OptList), finish_fold(Acc2, []). +red_fold({p, _Partition, Key}, Red, Acc) -> + red_fold(Key, Red, Acc); red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 -> {ok, Acc#mracc{skip=N-1, last_go=ok}}; red_fold(Key, Red, #mracc{meta_sent=false}=Acc) -> diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl index 004caef09..cdf498e5d 100644 --- a/src/couch_mrview/src/couch_mrview_http.erl +++ b/src/couch_mrview/src/couch_mrview_http.erl @@ -296,7 +296,7 @@ multi_query_view(Req, Db, DDoc, ViewName, Queries) -> {ok, _, _, Args1} = couch_mrview_util:get_view(Db, DDoc, ViewName, Args0), ArgQueries = lists:map(fun({Query}) -> QueryArg = parse_params(Query, undefined, Args1), - couch_mrview_util:validate_args(QueryArg) + couch_mrview_util:validate_args(Db, DDoc, QueryArg) end, Queries), {ok, Resp2} = couch_httpd:etag_maybe(Req, fun() -> Max = chttpd:chunked_response_buffer_size(), @@ -582,6 +582,10 @@ parse_param(Key, Val, Args, IsDecoded) -> Args#mrargs{callback=couch_util:to_binary(Val)}; "sorted" -> Args#mrargs{sorted=parse_boolean(Val)}; + "partition" -> + Partition = couch_util:to_binary(Val), + couch_partition:validate_partition(Partition), + couch_mrview_util:set_extra(Args, partition, Partition); _ -> BKey = couch_util:to_binary(Key), BVal = couch_util:to_binary(Val), diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index d3bcfe04b..ac433335c 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -40,10 +40,12 @@ get(update_options, #mrst{design_opts = Opts}) -> LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false), SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false), KeySeqIndexed = couch_util:get_value(<<"keyseq_indexed">>, Opts, false), + Partitioned = couch_util:get_value(<<"partitioned">>, Opts, false), if IncDesign -> [include_design]; true -> [] end ++ if LocalSeq -> [local_seq]; true -> [] end ++ if KeySeqIndexed -> [keyseq_indexed]; true -> [] end - ++ if SeqIndexed -> [seq_indexed]; true -> [] end; + ++ if SeqIndexed -> [seq_indexed]; true -> [] end + ++ if Partitioned -> [partitioned]; true -> [] end; get(fd, #mrst{fd = Fd}) -> Fd; get(language, #mrst{language = Language}) -> @@ -94,14 +96,15 @@ get(Other, _) -> init(Db, DDoc) -> - couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc). + {ok, State} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc), + {ok, set_partitioned(Db, State)}. -open(Db, State) -> +open(Db, State0) -> #mrst{ db_name=DbName, sig=Sig - } = State, + } = State = set_partitioned(Db, State0), IndexFName = couch_mrview_util:index_file(DbName, Sig), % If we are upgrading from <=1.2.x, we upgrade the view @@ -244,6 +247,26 @@ verify_index_exists(DbName, Props) -> end. +set_partitioned(Db, State) -> + #mrst{ + design_opts = DesignOpts + } = State, + DbPartitioned = couch_db:is_partitioned(Db), + ViewPartitioned = couch_util:get_value( + <<"partitioned">>, DesignOpts, DbPartitioned), + IsPartitioned = case {DbPartitioned, ViewPartitioned} of + {true, true} -> + true; + {true, false} -> + false; + {false, false} -> + false; + _ -> + throw({bad_request, <<"invalid partition option">>}) + end, + State#mrst{partitioned = IsPartitioned}. + + ensure_local_purge_docs(DbName, DDocs) -> couch_util:with_db(DbName, fun(Db) -> lists:foreach(fun(DDoc) -> diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 3383b49b6..9740e6a28 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -65,7 +65,8 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> #mrst{ id_btree=IdBtree, log_btree=LogBtree, - views=Views + views=Views, + partitioned=Partitioned } = State, Ids = [Id || {Id, _Revs} <- PurgedIdRevs], @@ -84,7 +85,11 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> FoldFun = fun ({ViewNum, {Key, Seq, _Op}}, DictAcc2) -> dict:append(ViewNum, {Key, Seq, DocId}, DictAcc2); - ({ViewNum, RowKey}, DictAcc2) -> + ({ViewNum, RowKey0}, DictAcc2) -> + RowKey = if not Partitioned -> RowKey0; true -> + [{RK, _}] = inject_partition([{RowKey0, DocId}]), + RK + end, dict:append(ViewNum, {RowKey, DocId}, DictAcc2) end, lists:foldl(FoldFun, DictAcc, ViewNumRowKeys); @@ -315,7 +320,8 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> #mrst{ id_btree=IdBtree, log_btree=LogBtree, - first_build=FirstBuild + first_build=FirstBuild, + partitioned=Partitioned } = State, Revs = dict:from_list(dict:fetch_keys(Log0)), @@ -332,9 +338,17 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild) end, - UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) -> + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) -> #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View, - ToRem = couch_util:dict_find(ViewId, ToRemByView, []), + ToRem0 = couch_util:dict_find(ViewId, ToRemByView, []), + {KVs, ToRem} = case Partitioned of + true -> + KVs1 = inject_partition(KVs0), + ToRem1 = inject_partition(ToRem0), + {KVs1, ToRem1}; + false -> + {KVs0, ToRem0} + end, {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), NewUpdateSeq = case VBtree2 =/= View#mrview.btree of true -> UpdateSeq; @@ -382,6 +396,20 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> log_btree=LogBtree2 }. + +inject_partition(Rows) -> + lists:map(fun + ({{Key, DocId}, Value}) -> + % Adding a row to the view + {Partition, _} = couch_partition:extract(DocId), + {{{p, Partition, Key}, DocId}, Value}; + ({Key, DocId}) -> + % Removing a row based on values in id_tree + {Partition, _} = couch_partition:extract(DocId), + {{p, Partition, Key}, DocId} + end, Rows). + + update_id_btree(Btree, DocIdKeys, true) -> ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []], couch_btree:query_modify(Btree, [], ToAdd, []); diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 4fd82e0af..b879d1242 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -26,12 +26,13 @@ -export([temp_view_to_ddoc/1]). -export([calculate_external_size/1]). -export([calculate_active_size/1]). --export([validate_args/1]). +-export([validate_all_docs_args/2, validate_args/1, validate_args/3]). -export([maybe_load_doc/3, maybe_load_doc/4]). -export([maybe_update_index_file/1]). -export([extract_view/4, extract_view_reduce/1]). -export([get_view_keys/1, get_view_queries/1]). -export([set_view_type/3]). +-export([set_extra/3, get_extra/2, get_extra/3]). -export([changes_key_opts/2]). -export([fold_changes/4]). -export([to_key_seq/1]). @@ -39,6 +40,10 @@ -define(MOD, couch_mrview_index). -define(GET_VIEW_RETRY_COUNT, 1). -define(GET_VIEW_RETRY_DELAY, 50). +-define(LOWEST_KEY, null). +-define(HIGHEST_KEY, {<<255, 255, 255, 255>>}). +-define(LOWEST(A, B), (if A < B -> A; true -> B end)). +-define(HIGHEST(A, B), (if A > B -> A; true -> B end)). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). @@ -94,7 +99,7 @@ get_view(Db, DDoc, ViewName, Args0) -> get_view_index_pid(Db, DDoc, ViewName, Args0) -> ArgCheck = fun(InitState) -> Args1 = set_view_type(Args0, ViewName, InitState#mrst.views), - {ok, validate_args(Args1)} + {ok, validate_args(InitState, Args1)} end, couch_index_server:get_index(?MOD, Db, DDoc, ArgCheck). @@ -169,6 +174,7 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}), SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false), KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false), + Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false), {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), BySrc = lists:foldl(MakeDict, dict:new(), RawViews), @@ -189,7 +195,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> language=Language, design_opts=DesignOpts, seq_indexed=SeqIndexed, - keyseq_indexed=KeySeqIndexed + keyseq_indexed=KeySeqIndexed, + partitioned=Partitioned }, SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}. @@ -213,6 +220,19 @@ set_view_type(Args, ViewName, [View | Rest]) -> end. +set_extra(#mrargs{} = Args, Key, Value) -> + Extra0 = Args#mrargs.extra, + Extra1 = lists:ukeysort(1, [{Key, Value} | Extra0]), + Args#mrargs{extra = Extra1}. + + +get_extra(#mrargs{} = Args, Key) -> + couch_util:get_value(Key, Args#mrargs.extra). + +get_extra(#mrargs{} = Args, Key, Default) -> + couch_util:get_value(Key, Args#mrargs.extra, Default). + + extract_view(_Lang, _Args, _ViewName, []) -> throw({not_found, missing_named_view}); extract_view(Lang, #mrargs{view_type=map}=Args, Name, [View | Rest]) -> @@ -476,6 +496,49 @@ fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) -> couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options). +validate_args(Db, DDoc, Args) -> + {ok, State} = couch_mrview_index:init(Db, DDoc), + validate_args(State, Args). + + +validate_args(#mrst{} = State, Args0) -> + Args = validate_args(Args0), + + ViewPartitioned = State#mrst.partitioned, + Partition = get_extra(Args, partition), + + case {ViewPartitioned, Partition} of + {true, undefined} -> + Msg1 = <<"`partition` parameter is mandatory " + "for queries to this view.">>, + mrverror(Msg1); + {true, _} -> + apply_partition(Args, Partition); + {false, undefined} -> + Args; + {false, Value} when is_binary(Value) -> + Msg2 = <<"`partition` parameter is not " + "supported in this design doc">>, + mrverror(Msg2) + end. + + +validate_all_docs_args(Db, Args0) -> + Args = validate_args(Args0), + + DbPartitioned = couch_db:is_partitioned(Db), + Partition = get_extra(Args, partition), + + case {DbPartitioned, Partition} of + {false, <<_/binary>>} -> + mrverror(<<"`partition` parameter is not supported on this db">>); + {_, <<_/binary>>} -> + apply_all_docs_partition(Args, Partition); + _ -> + Args + end. + + validate_args(Args) -> GroupLevel = determine_group_level(Args), Reduce = Args#mrargs.reduce, @@ -598,6 +661,12 @@ validate_args(Args) -> _ -> mrverror(<<"Invalid value for `sorted`.">>) end, + case get_extra(Args, partition) of + undefined -> ok; + Partition when is_binary(Partition), Partition /= <<>> -> ok; + _ -> mrverror(<<"Invalid value for `partition`.">>) + end, + Args#mrargs{ start_key_docid=SKDocId, end_key_docid=EKDocId, @@ -616,6 +685,70 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) -> determine_group_level(#mrargs{group_level=GroupLevel}) -> GroupLevel. +apply_partition(#mrargs{keys=[{p, _, _} | _]} = Args, _Partition) -> + Args; % already applied + +apply_partition(#mrargs{keys=Keys} = Args, Partition) when Keys /= undefined -> + Args#mrargs{keys=[{p, Partition, K} || K <- Keys]}; + +apply_partition(#mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args, _Partition) -> + Args; % already applied. + +apply_partition(Args, Partition) -> + #mrargs{ + direction = Dir, + start_key = StartKey, + end_key = EndKey + } = Args, + + {DefSK, DefEK} = case Dir of + fwd -> {?LOWEST_KEY, ?HIGHEST_KEY}; + rev -> {?HIGHEST_KEY, ?LOWEST_KEY} + end, + + SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end, + EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end, + + Args#mrargs{ + start_key = {p, Partition, SK0}, + end_key = {p, Partition, EK0} + }. + +%% all_docs is special as it's not really a view and is already +%% effectively partitioned as the partition is a prefix of all keys. +apply_all_docs_partition(#mrargs{} = Args, Partition) -> + #mrargs{ + direction = Dir, + start_key = StartKey, + end_key = EndKey + } = Args, + + {DefSK, DefEK} = case Dir of + fwd -> + { + couch_partition:start_key(Partition), + couch_partition:end_key(Partition) + }; + rev -> + { + couch_partition:end_key(Partition), + couch_partition:start_key(Partition) + } + end, + + SK0 = if StartKey == undefined -> DefSK; true -> StartKey end, + EK0 = if EndKey == undefined -> DefEK; true -> EndKey end, + + {SK1, EK1} = case Dir of + fwd -> {?HIGHEST(DefSK, SK0), ?LOWEST(DefEK, EK0)}; + rev -> {?LOWEST(DefSK, SK0), ?HIGHEST(DefEK, EK0)} + end, + + Args#mrargs{ + start_key = SK1, + end_key = EK1 + }. + check_range(#mrargs{start_key=undefined}, _Cmp) -> ok; diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 7476ff7b2..6d04184e6 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -392,10 +392,11 @@ query_view(Db, Options, GroupId, ViewName, Callback, Acc0, QueryArgs) when is_binary(GroupId) -> DbName = dbname(Db), {ok, DDoc} = ddoc_cache:open(DbName, <<"_design/", GroupId/binary>>), - query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs); -query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> - Db = dbname(DbName), View = name(ViewName), - case fabric_util:is_users_db(Db) of + query_view(Db, Options, DDoc, ViewName, Callback, Acc0, QueryArgs); +query_view(Db, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> + DbName = dbname(Db), + View = name(ViewName), + case fabric_util:is_users_db(DbName) of true -> FakeDb = fabric_util:open_cluster_db(DbName, Options), couch_users_db:after_doc_read(DDoc, FakeDb); @@ -403,9 +404,9 @@ query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> ok end, {ok, #mrst{views=Views, language=Lang}} = - couch_mrview_util:ddoc_to_mrst(Db, DDoc), + couch_mrview_util:ddoc_to_mrst(DbName, DDoc), QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views), - QueryArgs2 = couch_mrview_util:validate_args(QueryArgs1), + QueryArgs2 = fabric_util:validate_args(Db, DDoc, QueryArgs1), VInfo = couch_mrview_util:extract_view(Lang, QueryArgs2, View, Views), case is_reduce_view(QueryArgs2) of true -> diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index ae0c2be55..288c67cab 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -39,7 +39,6 @@ start(Workers0, Keypos, StartFun, Replacements) -> Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of {ok, #stream_acc{workers=Workers}} -> - true = fabric_view:is_progress_possible(Workers), AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) -> rexi:stream_start(From), [Worker | WorkerAcc] diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 5a1585fbc..d65d3c81d 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -20,6 +20,7 @@ -export([is_users_db/1, is_replicator_db/1]). -export([open_cluster_db/1, open_cluster_db/2]). -export([is_partitioned/1]). +-export([validate_all_docs_args/2, validate_args/3]). -export([upgrade_mrargs/1]). -compile({inline, [{doc_id_and_rev,1}]}). @@ -248,6 +249,26 @@ is_partitioned(Db) -> couch_db:is_partitioned(Db). +validate_all_docs_args(DbName, Args) when is_binary(DbName) -> + Shards = mem3:shards(fabric:dbname(DbName)), + Db = open_cluster_db(hd(Shards)), + validate_all_docs_args(Db, Args); + +validate_all_docs_args(Db, Args) -> + true = couch_db:is_clustered(Db), + couch_mrview_util:validate_all_docs_args(Db, Args). + + +validate_args(DbName, DDoc, Args) when is_binary(DbName) -> + Shards = mem3:shards(fabric:dbname(DbName)), + Db = open_cluster_db(hd(Shards)), + validate_args(Db, DDoc, Args); + +validate_args(Db, DDoc, Args) -> + true = couch_db:is_clustered(Db), + couch_mrview_util:validate_args(Db, DDoc, Args). + + upgrade_mrargs(#mrargs{} = Args) -> Args; diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 69f42909a..0ba980a65 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -128,8 +128,11 @@ maybe_send_row(State) -> try get_next_row(State) of {_, NewState} when Skip > 0 -> maybe_send_row(NewState#collector{skip=Skip-1}); - {Row, NewState} -> - case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of + {Row0, NewState} -> + Row1 = possibly_embed_doc(NewState, Row0), + Row2 = detach_partition(Row1), + Row3 = transform_row(Row2), + case Callback(Row3, AccIn) of {stop, Acc} -> {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; {ok, Acc} -> @@ -194,6 +197,10 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args}, _ -> Row end. +detach_partition(#view_row{key={p, _Partition, Key}} = Row) -> + Row#view_row{key = Key}; +detach_partition(#view_row{} = Row) -> + Row. keydict(undefined) -> undefined; @@ -309,10 +316,26 @@ index_of(X, [X|_Rest], I) -> index_of(X, [_|Rest], I) -> index_of(X, Rest, I+1). -get_shards(DbName, #mrargs{stable=true}) -> - mem3:ushards(DbName); -get_shards(DbName, #mrargs{stable=false}) -> - mem3:shards(DbName). +get_shards(Db, #mrargs{} = Args) -> + DbPartitioned = fabric_util:is_partitioned(Db), + Partition = couch_mrview_util:get_extra(Args, partition), + if DbPartitioned orelse Partition == undefined -> ok; true -> + throw({bad_request, <<"partition specified on non-partitioned db">>}) + end, + DbName = fabric:dbname(Db), + % Decide which version of mem3:shards/1,2 or + % mem3:ushards/1,2 to use for the current + % request. + case {Args#mrargs.stable, Partition} of + {true, undefined} -> + mem3:ushards(DbName); + {true, Partition} -> + mem3:ushards(DbName, couch_partition:shard_key(Partition)); + {false, undefined} -> + mem3:shards(DbName); + {false, Partition} -> + mem3:shards(DbName, couch_partition:shard_key(Partition)) + end. maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName, #mrargs{update=lazy} = Args) -> diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index a404125fa..263538f65 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -20,8 +20,9 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> - Shards = mem3:shards(DbName), +go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> + DbName = fabric:dbname(Db), + Shards = shards(Db, QueryArgs), Workers0 = fabric_util:submit_jobs( Shards, fabric_rpc, all_docs, [Options, QueryArgs]), RexiMon = fabric_util:create_monitors(Workers0), diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index ee51bfe74..0f5e8bb23 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -24,8 +24,9 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo); -go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> - Shards = fabric_view:get_shards(DbName, Args), +go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> + DbName = fabric:dbname(Db), + Shards = fabric_view:get_shards(Db, Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index b2b8a05f0..84b9bba64 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -23,10 +23,11 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), go(DbName, DDoc, View, Args, Callback, Acc0, VInfo); -go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> +go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> + DbName = fabric:dbname(Db), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), RPCArgs = [DocIdAndRev, VName, Args], - Shards = fabric_view:get_shards(DbName, Args), + Shards = fabric_view:get_shards(Db, Args), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) -> |