summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2018-08-07 15:44:33 +0100
committerRobert Newson <rnewson@apache.org>2018-08-08 11:45:20 +0100
commit23683ab079a3a7f01d2f94d93c30ab5726f4f5e1 (patch)
tree034ca4511d8e84674cee755e30cca4a0c3b0d0bd
parent5d54d40cf070012d5e299803e237161757809aa2 (diff)
downloadcouchdb-23683ab079a3a7f01d2f94d93c30ab5726f4f5e1.tar.gz
implement partitioned views
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl14
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl55
-rw-r--r--src/fabric/src/fabric_view.erl19
3 files changed, 81 insertions, 7 deletions
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f48793..bfaf13605 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -311,9 +311,11 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
#mrst{
id_btree=IdBtree,
log_btree=LogBtree,
- first_build=FirstBuild
+ first_build=FirstBuild,
+ design_opts=DesignOpts
} = State,
+ Partitioned = couch_util:get_value(<<"partitioned">>, DesignOpts, false),
Revs = dict:from_list(dict:fetch_keys(Log0)),
Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) ->
@@ -328,8 +330,9 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
_ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
end,
- UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+ UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
#mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+ KVs = if Partitioned -> inject_partition(KVs0); true -> KVs0 end,
ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
{ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -378,6 +381,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
log_btree=LogBtree2
}.
+inject_partition(KVs) ->
+ [{{[partition(DocId), Key], DocId}, Value} || {{Key, DocId}, Value} <- KVs].
+
+partition(DocId) ->
+ [Partition, _Rest] = binary:split(DocId, <<":">>),
+ Partition.
+
update_id_btree(Btree, DocIdKeys, true) ->
ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
couch_btree:query_modify(Btree, [], ToAdd, []);
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 592bfb518..b1e3e227e 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -38,6 +38,9 @@
-define(MOD, couch_mrview_index).
-define(GET_VIEW_RETRY_COUNT, 1).
-define(GET_VIEW_RETRY_DELAY, 50).
+-define(LOWEST_KEY, null).
+-define(HIGHEST_KEY, {[{<<239, 191, 176>>, null}]}). % is {"\ufff0": null}
+
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -135,6 +138,7 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
{DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}),
SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false),
KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false),
+ DesignOpts1 = add_partitioned_opt(DbName, DesignOpts),
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
@@ -153,7 +157,7 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
lib=Lib,
views=Views,
language=Language,
- design_opts=DesignOpts,
+ design_opts=DesignOpts1,
seq_indexed=SeqIndexed,
keyseq_indexed=KeySeqIndexed
},
@@ -161,6 +165,17 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
{ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}.
+add_partitioned_opt(DbName, DesignOpts) ->
+ PartitionedOpt = proplists:get_value(<<"partitioned">>, DesignOpts),
+ Default = mem3:is_partitioned(DbName),
+ case {PartitionedOpt, Default} of
+ {undefined, true} ->
+ [{<<"partitioned">>, true} | DesignOpts];
+ _ ->
+ DesignOpts
+ end.
+
+
set_view_type(_Args, _ViewName, []) ->
throw({not_found, missing_named_view});
set_view_type(Args, ViewName, [View | Rest]) ->
@@ -588,7 +603,12 @@ validate_args(Args) ->
mrverror(<<"`partition` parameter is not supported in this view.">>)
end,
- Args#mrargs{
+ Args1 = case get_extra(Args, partitioned, false) of
+ true -> apply_partition(Args);
+ false -> Args
+ end,
+
+ Args1#mrargs{
start_key_docid=SKDocId,
end_key_docid=EKDocId,
group_level=GroupLevel
@@ -606,6 +626,37 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) ->
determine_group_level(#mrargs{group_level=GroupLevel}) ->
GroupLevel.
+apply_partition(#mrargs{} = Args0) ->
+ case get_extra(Args0, partition_applied, false) of
+ true ->
+ Args0;
+ false ->
+ Partition = get_extra(Args0, partition),
+ Args1 = apply_partition(Partition, Args0),
+ set_extra(Args1, partition_applied, true)
+ end.
+
+apply_partition(Partition, #mrargs{direction=fwd, start_key=undefined, end_key=undefined} = Args) ->
+ Args#mrargs{start_key=[Partition, ?LOWEST_KEY], end_key=[Partition, ?HIGHEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=rev, start_key=undefined, end_key=undefined} = Args) ->
+ Args#mrargs{start_key=[Partition, ?HIGHEST_KEY], end_key=[Partition, ?LOWEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=fwd, start_key=SK0, end_key=undefined} = Args) ->
+ Args#mrargs{start_key=[Partition, SK0], end_key=[Partition, ?HIGHEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=rev, start_key=SK0, end_key=undefined} = Args) ->
+ Args#mrargs{start_key=[Partition, SK0], end_key=[Partition, ?LOWEST_KEY]};
+
+apply_partition(Partition, #mrargs{direction=fwd, start_key=undefined, end_key=EK0} = Args) ->
+ Args#mrargs{start_key=[Partition, ?LOWEST_KEY], end_key=[Partition, EK0]};
+
+apply_partition(Partition, #mrargs{direction=rev, start_key=undefined, end_key=EK0} = Args) ->
+ Args#mrargs{start_key=[Partition, ?HIGHEST_KEY], end_key=[Partition, EK0]};
+
+apply_partition(Partition, #mrargs{start_key=SK0, end_key=EK0} = Args) ->
+ Args#mrargs{start_key=[Partition, SK0], end_key=[Partition, EK0]}.
+
check_range(#mrargs{start_key=undefined}, _Cmp) ->
ok;
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index eae4cd6f9..994c73940 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -119,8 +119,10 @@ maybe_send_row(State) ->
counters = Counters,
skip = Skip,
limit = Limit,
- user_acc = AccIn
+ user_acc = AccIn,
+ query_args = QueryArgs
} = State,
+ Partitioned = couch_mrview_util:get_extra(QueryArgs, partitioned, false),
case fabric_dict:any(0, Counters) of
true ->
{ok, State};
@@ -128,8 +130,14 @@ maybe_send_row(State) ->
try get_next_row(State) of
{_, NewState} when Skip > 0 ->
maybe_send_row(NewState#collector{skip=Skip-1});
- {Row, NewState} ->
- case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+ {Row0, NewState} ->
+ Row1 = possibly_embed_doc(NewState, Row0),
+ Row2 = if
+ Partitioned -> detach_partition(Row1);
+ true -> Row1
+ end,
+ Row3 = transform_row(Row2),
+ case Callback(Row3, AccIn) of
{stop, Acc} ->
{stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
{ok, Acc} ->
@@ -194,6 +202,11 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
_ -> Row
end.
+detach_partition(#view_row{key=[_Partition, Key]} = Row) ->
+ Row#view_row{key = Key};
+detach_partition(#view_row{key=null} = Row) ->
+ Row#view_row{key = null}.
+
keydict(undefined) ->
undefined;