diff options
author | Robert Newson <rnewson@apache.org> | 2018-08-07 15:44:33 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2018-09-05 11:51:31 +0100 |
commit | d475110376b76bc2044c02894f7dc7ec13a34453 (patch) | |
tree | 8367a97e0e4cb6de02751b1e1a56e74463050c19 | |
parent | f78e3f8415d70ab2f4df3c2538d6ee4a428bfb47 (diff) | |
download | couchdb-d475110376b76bc2044c02894f7dc7ec13a34453.tar.gz |
implement partitioned views
Co-authored-by: Robert Newson <rnewson@apache.org>
Co-authored-by: Paul J. Davis <paul.joseph.davis@gmail.com>
-rw-r--r-- | src/couch/src/couch_btree.erl | 6 | ||||
-rw-r--r-- | src/couch/src/couch_ejson_compare.erl | 4 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview.erl | 2 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_updater.erl | 14 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl | 53 | ||||
-rw-r--r-- | src/fabric/src/fabric_view.erl | 18 |
6 files changed, 85 insertions, 12 deletions
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index ea224b1ab..8acefb26d 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -133,7 +133,9 @@ make_group_fun(Bt, exact) -> end; make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 -> fun - ({[_|_] = Key1, _}, {[_|_] = Key2, _}) -> + GF({{p, _Partition, Key1}, Val1}, {{p, _Partition, Key2}, Val2}) -> + GF({Key1, Val1}, {Key2, Val2}); + GF({[_|_] = Key1, _}, {[_|_] = Key2, _}) -> SL1 = lists:sublist(Key1, GroupLevel), SL2 = lists:sublist(Key2, GroupLevel), case less(Bt, {SL1, nil}, {SL2, nil}) of @@ -147,7 +149,7 @@ make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 -> _ -> false end; - ({Key1, _}, {Key2, _}) -> + GF({Key1, _}, {Key2, _}) -> case less(Bt, {Key1, nil}, {Key2, nil}) of false -> case less(Bt, {Key2, nil}, {Key1, nil}) of diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl index 81adbb8f5..ca36c8656 100644 --- a/src/couch/src/couch_ejson_compare.erl +++ b/src/couch/src/couch_ejson_compare.erl @@ -22,6 +22,10 @@ init() -> Dir = code:priv_dir(couch), ok = erlang:load_nif(filename:join(Dir, ?MODULE), NumScheds). +% partitioned row comparison +less({p, PA, A}, {p, PB, B}) -> + less([PA, A], [PB, B]); + less(A, B) -> try less_nif(A, B) diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index db467f081..09945f555 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -614,6 +614,8 @@ red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) -> end, Acc, OptList), finish_fold(Acc2, []). +red_fold({p, _Partition, Key}, Red, Acc) -> + red_fold(Key, Red, Acc); red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 -> {ok, Acc#mracc{skip=N-1, last_go=ok}}; red_fold(Key, Red, #mracc{meta_sent=false}=Acc) -> diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 214f48793..2b69eee0f 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -311,9 +311,11 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> #mrst{ id_btree=IdBtree, log_btree=LogBtree, - first_build=FirstBuild + first_build=FirstBuild, + design_opts=DesignOpts } = State, + Partitioned = couch_util:get_value(<<"partitioned">>, DesignOpts, false), Revs = dict:from_list(dict:fetch_keys(Log0)), Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) -> @@ -328,8 +330,9 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild) end, - UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) -> + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) -> #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View, + KVs = if Partitioned -> inject_partition(KVs0); true -> KVs0 end, ToRem = couch_util:dict_find(ViewId, ToRemByView, []), {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), NewUpdateSeq = case VBtree2 =/= View#mrview.btree of @@ -378,6 +381,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> log_btree=LogBtree2 }. +inject_partition(KVs) -> + [{{{p, partition(DocId), Key}, DocId}, Value} || {{Key, DocId}, Value} <- KVs]. + +partition(DocId) -> + [Partition, _Rest] = binary:split(DocId, <<":">>), + Partition. + update_id_btree(Btree, DocIdKeys, true) -> ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []], couch_btree:query_modify(Btree, [], ToAdd, []); diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 592bfb518..574aac70e 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -38,6 +38,9 @@ -define(MOD, couch_mrview_index). -define(GET_VIEW_RETRY_COUNT, 1). -define(GET_VIEW_RETRY_DELAY, 50). +-define(LOWEST_KEY, null). +-define(HIGHEST_KEY, {[{<<239, 191, 176>>, null}]}). % is {"\ufff0": null} + -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). @@ -226,11 +229,12 @@ view_sig(_Db, State, View, Args0) -> PurgeSeq = View#mrview.purge_seq, SeqIndexed = View#mrview.seq_indexed, KeySeqIndexed = View#mrview.keyseq_indexed, + Partitioned = get_extra(Args0, partitioned, false), Args = Args0#mrargs{ preflight_fun=undefined, extra=[] }, - Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args), + Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Partitioned, Args), couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))). view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false) -> @@ -238,10 +242,12 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false) -> view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed) -> {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed}. -view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false, Args) -> +view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false, false, Args) -> {BaseSig, UpdateSeq, PurgeSeq, Args}; -view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) -> - {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args}. +view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, false, Args) -> + {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args}; +view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Partitioned, Args) -> + {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Partitioned, Args}. init_state(Db, Fd, #mrst{views=Views}=State, nil) -> @@ -588,7 +594,12 @@ validate_args(Args) -> mrverror(<<"`partition` parameter is not supported in this view.">>) end, - Args#mrargs{ + Args1 = case get_extra(Args, partitioned, false) of + true -> apply_partition(Args); + false -> Args + end, + + Args1#mrargs{ start_key_docid=SKDocId, end_key_docid=EKDocId, group_level=GroupLevel @@ -606,6 +617,38 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) -> determine_group_level(#mrargs{group_level=GroupLevel}) -> GroupLevel. +apply_partition(#mrargs{} = Args0) -> + Partition = get_extra(Args0, partition), + apply_partition(Partition, Args0). + +apply_partition(_Partition, #mrargs{keys=[{p, _, _} | _]} = Args) -> + Args; % already applied + +apply_partition(Partition, #mrargs{keys=Keys} = Args) when Keys /= undefined -> + Args#mrargs{keys=[{p, Partition, K} || K <- Keys]}; + +apply_partition(_Partition, #mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args) -> + Args; % already applied. + +apply_partition(Partition, Args) -> + #mrargs{ + direction = Dir, + start_key = StartKey, + end_key = EndKey + } = Args, + + {DefSK, DefEK} = case Dir of + fwd -> {?LOWEST_KEY, ?HIGHEST_KEY}; + rev -> {?HIGHEST_KEY, ?LOWEST_KEY} + end, + + SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end, + EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end, + + Args#mrargs{ + start_key = {p, Partition, SK0}, + end_key = {p, Partition, EK0} + }. check_range(#mrargs{start_key=undefined}, _Cmp) -> ok; diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index b4b8a8c38..844b44dfd 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -119,8 +119,10 @@ maybe_send_row(State) -> counters = Counters, skip = Skip, limit = Limit, - user_acc = AccIn + user_acc = AccIn, + query_args = QueryArgs } = State, + Partitioned = couch_mrview_util:get_extra(QueryArgs, partitioned, false), case fabric_dict:any(0, Counters) of true -> {ok, State}; @@ -128,8 +130,14 @@ maybe_send_row(State) -> try get_next_row(State) of {_, NewState} when Skip > 0 -> maybe_send_row(NewState#collector{skip=Skip-1}); - {Row, NewState} -> - case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of + {Row0, NewState} -> + Row1 = possibly_embed_doc(NewState, Row0), + Row2 = if + Partitioned -> detach_partition(Row1); + true -> Row1 + end, + Row3 = transform_row(Row2), + case Callback(Row3, AccIn) of {stop, Acc} -> {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; {ok, Acc} -> @@ -194,6 +202,10 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args}, _ -> Row end. +detach_partition(#view_row{key={p, _Partition, Key}} = Row) -> + Row#view_row{key = Key}; +detach_partition(#view_row{} = Row) -> + Row. keydict(undefined) -> undefined; |