summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2018-07-09 18:08:57 +0100
committerRobert Newson <rnewson@apache.org>2018-07-10 15:30:14 +0100
commit9c8e4afe7fff33d5a0ace8568fa330961e75b67f (patch)
tree72b0e9ac10d8be373825432792e9769f18253f3a
parent28bb00bc2677b7cc63cd947b9114152ba9e2e24a (diff)
downloadcouchdb-9c8e4afe7fff33d5a0ace8568fa330961e75b67f.tar.gz
WIP support user-partitioned views
-rw-r--r--src/couch_mrview/include/couch_mrview.hrl4
-rw-r--r--src/couch_mrview/src/couch_mrview_http.erl2
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl15
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl66
4 files changed, 78 insertions, 9 deletions
diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl
index a341e30db..67b3cd987 100644
--- a/src/couch_mrview/include/couch_mrview.hrl
+++ b/src/couch_mrview/include/couch_mrview.hrl
@@ -31,7 +31,8 @@
doc_acc,
doc_queue,
write_queue,
- qserver=nil
+ qserver=nil,
+ partitioned=false
}).
@@ -87,6 +88,7 @@
conflicts,
callback,
sorted = true,
+ partition_key,
extra = []
}).
diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl
index 9dae1d86c..5ff7285aa 100644
--- a/src/couch_mrview/src/couch_mrview_http.erl
+++ b/src/couch_mrview/src/couch_mrview_http.erl
@@ -582,6 +582,8 @@ parse_param(Key, Val, Args, IsDecoded) ->
Args#mrargs{callback=couch_util:to_binary(Val)};
"sorted" ->
Args#mrargs{sorted=parse_boolean(Val)};
+ "partition_key" ->
+ Args#mrargs{partition_key=couch_util:to_binary(Val)};
_ ->
BKey = couch_util:to_binary(Key),
BVal = couch_util:to_binary(Val),
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f48793..e1cc280ff 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -311,7 +311,8 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
#mrst{
id_btree=IdBtree,
log_btree=LogBtree,
- first_build=FirstBuild
+ first_build=FirstBuild,
+ partitioned=Partitioned
} = State,
Revs = dict:from_list(dict:fetch_keys(Log0)),
@@ -328,8 +329,15 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
_ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
end,
- UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+ UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
#mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+ KVs = case Partitioned of
+ true ->
+ [{{[partition(D), K], D}, V} || {{K, D}, V} <- KVs0];
+ false ->
+ KVs0
+ end,
+
ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
{ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -484,3 +492,6 @@ maybe_notify(State, View, KVs, ToRem) ->
[Key || {Key, _DocId} <- ToRem]
end,
couch_index_plugin:index_update(State, View, Updated, Removed).
+
+partition(DocId) ->
+ hd(binary:split(DocId, <<":">>)).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 086bf9bbf..35554c4c8 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -24,7 +24,7 @@
-export([temp_view_to_ddoc/1]).
-export([calculate_external_size/1]).
-export([calculate_active_size/1]).
--export([validate_and_update_args/1]).
+-export([validate_and_update_args/1, validate_and_update_args/2]).
-export([maybe_load_doc/3, maybe_load_doc/4]).
-export([maybe_update_index_file/1]).
-export([extract_view/4, extract_view_reduce/1]).
@@ -33,6 +33,7 @@
-export([changes_key_opts/2]).
-export([fold_changes/4]).
-export([to_key_seq/1]).
+-export([partition_key/2, unpartition_key/1]).
-define(MOD, couch_mrview_index).
-define(GET_VIEW_RETRY_COUNT, 1).
@@ -281,6 +282,12 @@ init_state(Db, Fd, State, Header) ->
OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end,
Views2 = lists:zipwith(OpenViewFun, ViewStates, Views),
+ Partitioned = case couch_db_engine:get_prop(Db, partitioned) of
+ {ok, true} -> true;
+ {ok, false} -> false;
+ {error, no_value} -> false
+ end,
+
State#mrst{
fd=Fd,
fd_monitor=erlang:monitor(process, Fd),
@@ -288,7 +295,8 @@ init_state(Db, Fd, State, Header) ->
purge_seq=PurgeSeq,
id_btree=IdBtree,
log_btree=LogBtree,
- views=Views2
+ views=Views2,
+ partitioned=Partitioned
}.
open_view(_Db, Fd, Lang, ViewState, View) ->
@@ -441,7 +449,7 @@ fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) ->
couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
-validate_args(Args) ->
+validate_args(Args, Options) ->
GroupLevel = determine_group_level(Args),
Reduce = Args#mrargs.reduce,
case Reduce == undefined orelse is_boolean(Reduce) of
@@ -551,10 +559,21 @@ validate_args(Args) ->
_ -> mrverror(<<"Invalid value for `sorted`.">>)
end,
+ case {lists:member(partitioned, Options), Args#mrargs.partition_key} of
+ {true, undefined} ->
+ mrverror(<<"`partition_key` parameter is mandatory for queries to this database.">>);
+ {true, _PartitionKey} ->
+ ok;
+ {false, undefined} ->
+ ok;
+ {false, _PartitionKey} ->
+ mrverror(<<"`partition_key` parameter is not supported in this database.">>)
+ end,
+
true.
-update_args(#mrargs{} = Args) ->
+update_args(#mrargs{} = Args, _Options) ->
GroupLevel = determine_group_level(Args),
SKDocId = case {Args#mrargs.direction, Args#mrargs.start_key_docid} of
@@ -569,7 +588,32 @@ update_args(#mrargs{} = Args) ->
{_, EKDocId1} -> EKDocId1
end,
+ LowestKey = null,
+ HighestKey = {[{<<239, 191, 176>>, null}]}, % \ufff0
+
+ {StartKey, EndKey} = case Args of
+ #mrargs{partition_key=undefined} ->
+ {Args#mrargs.start_key, Args#mrargs.end_key};
+
+ #mrargs{partition_key=PKey0} when not is_binary(PKey0) ->
+ mrverror(<<"`partition_key` must be a string.">>);
+
+ #mrargs{partition_key=PKey0, start_key=undefined, end_key=undefined} ->
+ {[PKey0, LowestKey], [PKey0, HighestKey]};
+
+ #mrargs{partition_key=PKey0, start_key=SK0, end_key=undefined} ->
+ {[PKey0, SK0], [PKey0, HighestKey]};
+
+ #mrargs{partition_key=PKey0, start_key=undefined, end_key=EK0} ->
+ {[PKey0, LowestKey], [PKey0, EK0]};
+
+ #mrargs{partition_key=PKey0, start_key=SK0, end_key=EK0} ->
+ {[PKey0, SK0], [PKey0, EK0]}
+ end,
+
Args#mrargs{
+ start_key=StartKey,
+ end_key=EndKey,
start_key_docid=SKDocId,
end_key_docid=EKDocId,
group_level=GroupLevel
@@ -577,8 +621,11 @@ update_args(#mrargs{} = Args) ->
validate_and_update_args(#mrargs{} = Args) ->
- true = validate_args(Args),
- update_args(Args).
+ validate_and_update_args(Args, []).
+
+validate_and_update_args(#mrargs{} = Args, Options) ->
+ true = validate_args(Args, Options),
+ update_args(Args, Options).
determine_group_level(#mrargs{group=undefined, group_level=undefined}) ->
@@ -1211,3 +1258,10 @@ kv_external_size(KVList, Reduction) ->
lists:foldl(fun([[Key, _], Value], Acc) ->
?term_size(Key) + ?term_size(Value) + Acc
end, ?term_size(Reduction), KVList).
+
+
+partition_key(Key, DocId) ->
+ [hd(binary:split(DocId, <<":">>)), Key].
+
+unpartition_key([_Partition, Key]) ->
+ Key.