summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2018-08-07 15:44:33 +0100
committerRobert Newson <rnewson@apache.org>2018-09-03 17:43:04 +0100
commitc3f354eaf9931c903e1d3e33df161269a60a2edc (patch)
treec1bd703c32f78309016f9405c57b1c3a6b0ea539
parentf78e3f8415d70ab2f4df3c2538d6ee4a428bfb47 (diff)
downloadcouchdb-c3f354eaf9931c903e1d3e33df161269a60a2edc.tar.gz
implement partitioned views
Co-authored-by: Robert Newson <rnewson@apache.org> Co-authored-by: Paul J. Davis <paul.joseph.davis@gmail.com>
-rw-r--r--src/couch/src/couch_btree.erl14
-rw-r--r--src/couch/src/couch_ejson_compare.erl4
-rw-r--r--src/couch_mrview/src/couch_mrview.erl2
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl14
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl53
-rw-r--r--src/fabric/src/fabric_view.erl18
6 files changed, 95 insertions, 10 deletions
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index ea224b1ab..d11d7e601 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -133,6 +133,20 @@ make_group_fun(Bt, exact) ->
end;
make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 ->
fun
+ ({{p, _Partition, Key1}, _}, {{p, _Partition, Key2}, _}) ->
+ SL1 = lists:sublist(Key1, GroupLevel),
+ SL2 = lists:sublist(Key2, GroupLevel),
+ case less(Bt, {SL1, nil}, {SL2, nil}) of
+ false ->
+ case less(Bt, {SL2, nil}, {SL1, nil}) of
+ false ->
+ true;
+ _ ->
+ false
+ end;
+ _ ->
+ false
+ end;
({[_|_] = Key1, _}, {[_|_] = Key2, _}) ->
SL1 = lists:sublist(Key1, GroupLevel),
SL2 = lists:sublist(Key2, GroupLevel),
diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl
index 81adbb8f5..ca36c8656 100644
--- a/src/couch/src/couch_ejson_compare.erl
+++ b/src/couch/src/couch_ejson_compare.erl
@@ -22,6 +22,10 @@ init() ->
Dir = code:priv_dir(couch),
ok = erlang:load_nif(filename:join(Dir, ?MODULE), NumScheds).
+% partitioned row comparison
+less({p, PA, A}, {p, PB, B}) ->
+ less([PA, A], [PB, B]);
+
less(A, B) ->
try
less_nif(A, B)
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index db467f081..09945f555 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -614,6 +614,8 @@ red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) ->
end, Acc, OptList),
finish_fold(Acc2, []).
+red_fold({p, _Partition, Key}, Red, Acc) ->
+ red_fold(Key, Red, Acc);
red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 ->
{ok, Acc#mracc{skip=N-1, last_go=ok}};
red_fold(Key, Red, #mracc{meta_sent=false}=Acc) ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f48793..2b69eee0f 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -311,9 +311,11 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
#mrst{
id_btree=IdBtree,
log_btree=LogBtree,
- first_build=FirstBuild
+ first_build=FirstBuild,
+ design_opts=DesignOpts
} = State,
+ Partitioned = couch_util:get_value(<<"partitioned">>, DesignOpts, false),
Revs = dict:from_list(dict:fetch_keys(Log0)),
Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) ->
@@ -328,8 +330,9 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
_ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
end,
- UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+ UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
#mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+ KVs = if Partitioned -> inject_partition(KVs0); true -> KVs0 end,
ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
{ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -378,6 +381,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
log_btree=LogBtree2
}.
+inject_partition(KVs) ->
+ [{{{p, partition(DocId), Key}, DocId}, Value} || {{Key, DocId}, Value} <- KVs].
+
+partition(DocId) ->
+ [Partition, _Rest] = binary:split(DocId, <<":">>),
+ Partition.
+
update_id_btree(Btree, DocIdKeys, true) ->
ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
couch_btree:query_modify(Btree, [], ToAdd, []);
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 592bfb518..574aac70e 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -38,6 +38,9 @@
-define(MOD, couch_mrview_index).
-define(GET_VIEW_RETRY_COUNT, 1).
-define(GET_VIEW_RETRY_DELAY, 50).
+-define(LOWEST_KEY, null).
+-define(HIGHEST_KEY, {[{<<239, 191, 176>>, null}]}). % is {"\ufff0": null}
+
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -226,11 +229,12 @@ view_sig(_Db, State, View, Args0) ->
PurgeSeq = View#mrview.purge_seq,
SeqIndexed = View#mrview.seq_indexed,
KeySeqIndexed = View#mrview.keyseq_indexed,
+ Partitioned = get_extra(Args0, partitioned, false),
Args = Args0#mrargs{
preflight_fun=undefined,
extra=[]
},
- Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args),
+ Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Partitioned, Args),
couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))).
view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false) ->
@@ -238,10 +242,12 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false) ->
view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed) ->
{BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed}.
-view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false, Args) ->
+view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false, false, Args) ->
{BaseSig, UpdateSeq, PurgeSeq, Args};
-view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) ->
- {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args}.
+view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, false, Args) ->
+ {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args};
+view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Partitioned, Args) ->
+ {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Partitioned, Args}.
init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
@@ -588,7 +594,12 @@ validate_args(Args) ->
mrverror(<<"`partition` parameter is not supported in this view.">>)
end,
- Args#mrargs{
+ Args1 = case get_extra(Args, partitioned, false) of
+ true -> apply_partition(Args);
+ false -> Args
+ end,
+
+ Args1#mrargs{
start_key_docid=SKDocId,
end_key_docid=EKDocId,
group_level=GroupLevel
@@ -606,6 +617,38 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) ->
determine_group_level(#mrargs{group_level=GroupLevel}) ->
GroupLevel.
+apply_partition(#mrargs{} = Args0) ->
+ Partition = get_extra(Args0, partition),
+ apply_partition(Partition, Args0).
+
+apply_partition(_Partition, #mrargs{keys=[{p, _, _} | _]} = Args) ->
+ Args; % already applied
+
+apply_partition(Partition, #mrargs{keys=Keys} = Args) when Keys /= undefined ->
+ Args#mrargs{keys=[{p, Partition, K} || K <- Keys]};
+
+apply_partition(_Partition, #mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args) ->
+ Args; % already applied.
+
+apply_partition(Partition, Args) ->
+ #mrargs{
+ direction = Dir,
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+
+ {DefSK, DefEK} = case Dir of
+ fwd -> {?LOWEST_KEY, ?HIGHEST_KEY};
+ rev -> {?HIGHEST_KEY, ?LOWEST_KEY}
+ end,
+
+ SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end,
+ EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end,
+
+ Args#mrargs{
+ start_key = {p, Partition, SK0},
+ end_key = {p, Partition, EK0}
+ }.
check_range(#mrargs{start_key=undefined}, _Cmp) ->
ok;
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index b4b8a8c38..844b44dfd 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -119,8 +119,10 @@ maybe_send_row(State) ->
counters = Counters,
skip = Skip,
limit = Limit,
- user_acc = AccIn
+ user_acc = AccIn,
+ query_args = QueryArgs
} = State,
+ Partitioned = couch_mrview_util:get_extra(QueryArgs, partitioned, false),
case fabric_dict:any(0, Counters) of
true ->
{ok, State};
@@ -128,8 +130,14 @@ maybe_send_row(State) ->
try get_next_row(State) of
{_, NewState} when Skip > 0 ->
maybe_send_row(NewState#collector{skip=Skip-1});
- {Row, NewState} ->
- case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+ {Row0, NewState} ->
+ Row1 = possibly_embed_doc(NewState, Row0),
+ Row2 = if
+ Partitioned -> detach_partition(Row1);
+ true -> Row1
+ end,
+ Row3 = transform_row(Row2),
+ case Callback(Row3, AccIn) of
{stop, Acc} ->
{stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
{ok, Acc} ->
@@ -194,6 +202,10 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
_ -> Row
end.
+detach_partition(#view_row{key={p, _Partition, Key}} = Row) ->
+ Row#view_row{key = Key};
+detach_partition(#view_row{} = Row) ->
+ Row.
keydict(undefined) ->
undefined;