diff options
author | Robert Newson <rnewson@apache.org> | 2018-07-09 18:08:57 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2018-07-10 15:30:14 +0100 |
commit | 9c8e4afe7fff33d5a0ace8568fa330961e75b67f (patch) | |
tree | 72b0e9ac10d8be373825432792e9769f18253f3a | |
parent | 28bb00bc2677b7cc63cd947b9114152ba9e2e24a (diff) | |
download | couchdb-9c8e4afe7fff33d5a0ace8568fa330961e75b67f.tar.gz |
WIP support user-partitioned views
-rw-r--r-- | src/couch_mrview/include/couch_mrview.hrl | 4 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_http.erl | 2 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_updater.erl | 15 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl | 66 |
4 files changed, 78 insertions, 9 deletions
diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl index a341e30db..67b3cd987 100644 --- a/src/couch_mrview/include/couch_mrview.hrl +++ b/src/couch_mrview/include/couch_mrview.hrl @@ -31,7 +31,8 @@ doc_acc, doc_queue, write_queue, - qserver=nil + qserver=nil, + partitioned=false }). @@ -87,6 +88,7 @@ conflicts, callback, sorted = true, + partition_key, extra = [] }). diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl index 9dae1d86c..5ff7285aa 100644 --- a/src/couch_mrview/src/couch_mrview_http.erl +++ b/src/couch_mrview/src/couch_mrview_http.erl @@ -582,6 +582,8 @@ parse_param(Key, Val, Args, IsDecoded) -> Args#mrargs{callback=couch_util:to_binary(Val)}; "sorted" -> Args#mrargs{sorted=parse_boolean(Val)}; + "partition_key" -> + Args#mrargs{partition_key=couch_util:to_binary(Val)}; _ -> BKey = couch_util:to_binary(Key), BVal = couch_util:to_binary(Val), diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 214f48793..e1cc280ff 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -311,7 +311,8 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> #mrst{ id_btree=IdBtree, log_btree=LogBtree, - first_build=FirstBuild + first_build=FirstBuild, + partitioned=Partitioned } = State, Revs = dict:from_list(dict:fetch_keys(Log0)), @@ -328,8 +329,15 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild) end, - UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) -> + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) -> #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View, + KVs = case Partitioned of + true -> + [{{[partition(D), K], D}, V} || {{K, D}, V} <- KVs0]; + false -> + KVs0 + end, + ToRem = couch_util:dict_find(ViewId, ToRemByView, []), {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), NewUpdateSeq = case VBtree2 =/= View#mrview.btree of @@ -484,3 +492,6 @@ maybe_notify(State, View, KVs, ToRem) -> [Key || {Key, _DocId} <- ToRem] end, couch_index_plugin:index_update(State, View, Updated, Removed). + +partition(DocId) -> + hd(binary:split(DocId, <<":">>)). diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 086bf9bbf..35554c4c8 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -24,7 +24,7 @@ -export([temp_view_to_ddoc/1]). -export([calculate_external_size/1]). -export([calculate_active_size/1]). --export([validate_and_update_args/1]). +-export([validate_and_update_args/1, validate_and_update_args/2]). -export([maybe_load_doc/3, maybe_load_doc/4]). -export([maybe_update_index_file/1]). -export([extract_view/4, extract_view_reduce/1]). @@ -33,6 +33,7 @@ -export([changes_key_opts/2]). -export([fold_changes/4]). -export([to_key_seq/1]). +-export([partition_key/2, unpartition_key/1]). -define(MOD, couch_mrview_index). -define(GET_VIEW_RETRY_COUNT, 1). @@ -281,6 +282,12 @@ init_state(Db, Fd, State, Header) -> OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end, Views2 = lists:zipwith(OpenViewFun, ViewStates, Views), + Partitioned = case couch_db_engine:get_prop(Db, partitioned) of + {ok, true} -> true; + {ok, false} -> false; + {error, no_value} -> false + end, + State#mrst{ fd=Fd, fd_monitor=erlang:monitor(process, Fd), @@ -288,7 +295,8 @@ init_state(Db, Fd, State, Header) -> purge_seq=PurgeSeq, id_btree=IdBtree, log_btree=LogBtree, - views=Views2 + views=Views2, + partitioned=Partitioned }. open_view(_Db, Fd, Lang, ViewState, View) -> @@ -441,7 +449,7 @@ fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) -> couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options). -validate_args(Args) -> +validate_args(Args, Options) -> GroupLevel = determine_group_level(Args), Reduce = Args#mrargs.reduce, case Reduce == undefined orelse is_boolean(Reduce) of @@ -551,10 +559,21 @@ validate_args(Args) -> _ -> mrverror(<<"Invalid value for `sorted`.">>) end, + case {lists:member(partitioned, Options), Args#mrargs.partition_key} of + {true, undefined} -> + mrverror(<<"`partition_key` parameter is mandatory for queries to this database.">>); + {true, _PartitionKey} -> + ok; + {false, undefined} -> + ok; + {false, _PartitionKey} -> + mrverror(<<"`partition_key` parameter is not supported in this database.">>) + end, + true. -update_args(#mrargs{} = Args) -> +update_args(#mrargs{} = Args, _Options) -> GroupLevel = determine_group_level(Args), SKDocId = case {Args#mrargs.direction, Args#mrargs.start_key_docid} of @@ -569,7 +588,32 @@ update_args(#mrargs{} = Args) -> {_, EKDocId1} -> EKDocId1 end, + LowestKey = null, + HighestKey = {[{<<239, 191, 176>>, null}]}, % \ufff0 + + {StartKey, EndKey} = case Args of + #mrargs{partition_key=undefined} -> + {Args#mrargs.start_key, Args#mrargs.end_key}; + + #mrargs{partition_key=PKey0} when not is_binary(PKey0) -> + mrverror(<<"`partition_key` must be a string.">>); + + #mrargs{partition_key=PKey0, start_key=undefined, end_key=undefined} -> + {[PKey0, LowestKey], [PKey0, HighestKey]}; + + #mrargs{partition_key=PKey0, start_key=SK0, end_key=undefined} -> + {[PKey0, SK0], [PKey0, HighestKey]}; + + #mrargs{partition_key=PKey0, start_key=undefined, end_key=EK0} -> + {[PKey0, LowestKey], [PKey0, EK0]}; + + #mrargs{partition_key=PKey0, start_key=SK0, end_key=EK0} -> + {[PKey0, SK0], [PKey0, EK0]} + end, + Args#mrargs{ + start_key=StartKey, + end_key=EndKey, start_key_docid=SKDocId, end_key_docid=EKDocId, group_level=GroupLevel @@ -577,8 +621,11 @@ update_args(#mrargs{} = Args) -> validate_and_update_args(#mrargs{} = Args) -> - true = validate_args(Args), - update_args(Args). + validate_and_update_args(Args, []). + +validate_and_update_args(#mrargs{} = Args, Options) -> + true = validate_args(Args, Options), + update_args(Args, Options). determine_group_level(#mrargs{group=undefined, group_level=undefined}) -> @@ -1211,3 +1258,10 @@ kv_external_size(KVList, Reduction) -> lists:foldl(fun([[Key, _], Value], Acc) -> ?term_size(Key) + ?term_size(Value) + Acc end, ?term_size(Reduction), KVList). + + +partition_key(Key, DocId) -> + [hd(binary:split(DocId, <<":">>)), Key]. + +unpartition_key([_Partition, Key]) -> + Key. |