summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2020-07-26 20:02:35 +0200
committerJan Lehnardt <jan@apache.org>2020-07-26 20:09:36 +0200
commit672a790139febef2c0924cd1875d167b7ae95031 (patch)
tree72f3a149727b072788034b8eb480a5841f5ea8e2
parent92d1c4399ebb4bd1a9c64b9919f6cbe752b50bd0 (diff)
downloadcouchdb-672a790139febef2c0924cd1875d167b7ae95031.tar.gz
feat(access): main db update and validation logic
-rw-r--r--src/couch/src/couch_db.erl230
-rw-r--r--src/couch/src/couch_db_updater.erl147
2 files changed, 319 insertions, 58 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index e1d726dc9..cac768df4 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -30,6 +30,9 @@
is_admin/1,
check_is_admin/1,
check_is_member/1,
+ validate_access/2,
+ check_access/2,
+ has_access_enabled/1,
name/1,
get_after_doc_read_fun/1,
@@ -135,6 +138,8 @@
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
-include("couch_db_int.hrl").
-define(DBNAME_REGEX,
@@ -276,6 +281,12 @@ wait_for_compaction(#db{main_pid=Pid}=Db, Timeout) ->
ok
end.
+has_access_enabled(#db{access=true}) -> true;
+has_access_enabled(_) -> false.
+
+is_read_from_ddoc_cache(Options) ->
+ lists:member(ddoc_cache, Options).
+
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
{ok, [Result]} = update_docs(Db, DeletedDocs, []),
@@ -284,26 +295,36 @@ delete_doc(Db, Id, Revisions) ->
open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).
-open_doc(Db, Id, Options) ->
+open_doc(Db, Id, Options0) ->
increment_stat(Db, [couchdb, database_reads]),
+ Options = case has_access_enabled(Db) of
+ true -> Options0 ++ [conflicts];
+ _Else -> Options0
+ end,
case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
- apply_open_options({ok, Doc},Options);
+ {ok, Doc};
false ->
{not_found, deleted}
end;
Else ->
- apply_open_options(Else,Options)
+ Else
end.
-apply_open_options({ok, Doc},Options) ->
- apply_open_options2(Doc,Options);
-apply_open_options(Else,_Options) ->
+apply_open_options(Db, {ok, Doc}, Options) ->
+ ok = validate_access(Db, Doc, Options),
+ apply_open_options1({ok, Doc}, Options);
+apply_open_options(_Db, Else, _Options) ->
+ Else.
+
+apply_open_options1({ok, Doc}, Options) ->
+ apply_open_options2(Doc, Options);
+apply_open_options1(Else, _Options) ->
Else.
-apply_open_options2(Doc,[]) ->
+apply_open_options2(Doc, []) ->
{ok, Doc};
apply_open_options2(#doc{atts=Atts0,revs=Revs}=Doc,
[{atts_since, PossibleAncestors}|Rest]) ->
@@ -317,8 +338,8 @@ apply_open_options2(#doc{atts=Atts0,revs=Revs}=Doc,
apply_open_options2(Doc#doc{atts=Atts}, Rest);
apply_open_options2(Doc, [ejson_body | Rest]) ->
apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
-apply_open_options2(Doc,[_|Rest]) ->
- apply_open_options2(Doc,Rest).
+apply_open_options2(Doc, [_|Rest]) ->
+ apply_open_options2(Doc, Rest).
find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
@@ -336,7 +357,7 @@ find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
open_doc_revs(Db, Id, Revs, Options) ->
increment_stat(Db, [couchdb, database_reads]),
[{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
- {ok, [apply_open_options(Result, Options) || Result <- Results]}.
+ {ok, [apply_open_options(Db, Result, Options) || Result <- Results]}.
% Each returned result is a list of tuples:
% {Id, MissingRevs, PossibleAncestors}
@@ -577,7 +598,8 @@ get_db_info(Db) ->
name = Name,
compactor_pid = Compactor,
instance_start_time = StartTime,
- committed_update_seq = CommittedUpdateSeq
+ committed_update_seq = CommittedUpdateSeq,
+ access = Access
} = Db,
{ok, DocCount} = get_doc_count(Db),
{ok, DelDocCount} = get_del_doc_count(Db),
@@ -609,7 +631,8 @@ get_db_info(Db) ->
{committed_update_seq, CommittedUpdateSeq},
{compacted_seq, CompactedSeq},
{props, Props},
- {uuid, Uuid}
+ {uuid, Uuid},
+ {access, Access}
],
{ok, InfoList}.
@@ -724,6 +747,73 @@ security_error_type(#user_ctx{name=null}) ->
security_error_type(#user_ctx{name=_}) ->
forbidden.
+is_per_user_ddoc(#doc{access=[]}) -> false;
+is_per_user_ddoc(#doc{access=[<<"_users">>]}) -> false;
+is_per_user_ddoc(_) -> true.
+
+
+validate_access(Db, Doc) ->
+ validate_access(Db, Doc, []).
+
+validate_access(Db, Doc, Options) ->
+ validate_access1(has_access_enabled(Db), Db, Doc, Options).
+
+validate_access1(false, _Db, _Doc, _Options) -> ok;
+validate_access1(true, Db, #doc{meta=Meta}=Doc, Options) ->
+ case proplists:get_value(conflicts, Meta) of
+ undefined -> % no conflicts
+ case is_read_from_ddoc_cache(Options) andalso is_per_user_ddoc(Doc) of
+ true -> throw({not_found, missing});
+ _False -> validate_access2(Db, Doc)
+ end;
+ _Else -> % only admins can read conflicted docs in _access dbs
+ case is_admin(Db) of
+ true -> ok;
+ _Else2 -> throw({forbidden, <<"document is in conflict">>})
+ end
+ end.
+validate_access2(Db, Doc) ->
+ validate_access3(check_access(Db, Doc)).
+
+validate_access3(true) -> ok;
+validate_access3(_) -> throw({forbidden, <<"can't touch this">>}).
+
+check_access(Db, #doc{access=Access}=Doc) ->
+ check_access(Db, Access);
+check_access(Db, Access) ->
+ #user_ctx{
+ name=UserName,
+ roles=UserRoles
+ } = Db#db.user_ctx,
+ case Access of
+ [] ->
+ % if doc has no _access, userCtX must be admin
+ is_admin(Db);
+ Access ->
+ % if doc has _access, userCtx must be admin OR matching user or role
+ % _access = ["a", "b", ]
+ case is_admin(Db) of
+ true ->
+ true;
+ _ ->
+ case {check_name(UserName, Access), check_roles(UserRoles, Access)} of
+ {true, _} -> true;
+ {_, true} -> true;
+ _ -> false
+ end
+ end
+ end.
+
+check_name(null, _Access) -> true;
+check_name(UserName, Access) ->
+ lists:member(UserName, Access).
+% nicked from couch_db:check_security
+
+check_roles(Roles, Access) ->
+ UserRolesSet = ordsets:from_list(Roles),
+ RolesSet = ordsets:from_list(Access ++ ["_users"]),
+ not ordsets:is_disjoint(UserRolesSet, RolesSet).
+
get_admins(#db{security=SecProps}) ->
couch_util:get_value(<<"admins">>, SecProps, {[]}).
@@ -863,9 +953,14 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
end.
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
- case catch check_is_admin(Db) of
- ok -> validate_ddoc(Db, Doc);
- Error -> Error
+ case couch_doc:has_access(Doc) of
+ true ->
+ validate_ddoc(Db, Doc);
+ _Else ->
+ case catch check_is_admin(Db) of
+ ok -> validate_ddoc(Db, Doc);
+ Error -> Error
+ end
end;
validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
ValidationFuns = load_validation_funs(Db),
@@ -1172,6 +1267,32 @@ doc_tag(#doc{meta=Meta}) ->
Else -> throw({invalid_doc_tag, Else})
end.
+validate_update(Db, Doc) ->
+ case catch validate_access(Db, Doc) of
+ ok -> Doc;
+ Error -> Error
+ end.
+
+
+validate_docs_access(Db, DocBuckets, DocErrors) ->
+ validate_docs_access1(Db, DocBuckets, {[], DocErrors}).
+
+validate_docs_access1(_Db, [], {DocBuckets0, DocErrors}) ->
+ DocBuckets1 = lists:reverse(lists:map(fun lists:reverse/1, DocBuckets0)),
+ DocBuckets = case DocBuckets1 of
+ [[]] -> [];
+ Else -> Else
+ end,
+ {ok, DocBuckets, lists:reverse(DocErrors)};
+validate_docs_access1(Db, [DocBucket|RestBuckets], {DocAcc, ErrorAcc}) ->
+ {NewBuckets, NewErrors} = lists:foldl(fun(Doc, {Acc, ErrAcc}) ->
+ case catch validate_access(Db, Doc) of
+ ok -> {[Doc|Acc], ErrAcc};
+ Error -> {Acc, [{doc_tag(Doc), Error}|ErrAcc]}
+ end
+ end, {[], ErrorAcc}, DocBucket),
+ validate_docs_access1(Db, RestBuckets, {[NewBuckets | DocAcc], NewErrors}).
+
update_docs(Db, Docs0, Options, replicated_changes) ->
Docs = tag_docs(Docs0),
@@ -1185,9 +1306,31 @@ update_docs(Db, Docs0, Options, replicated_changes) ->
DocBuckets2 = [[doc_flush_atts(Db, check_dup_atts(Doc))
|| Doc <- Bucket] || Bucket <- DocBuckets],
- {ok, _} = write_and_commit(Db, DocBuckets2,
+ {ok, Results} = write_and_commit(Db, DocBuckets2,
NonRepDocs, [merge_conflicts | Options]),
- {ok, DocErrors};
+
+ case couch_db:has_access_enabled(Db) of
+ false ->
+ % we’re done here
+ {ok, DocErrors};
+ _ ->
+ AccessViolations = lists:filter(fun({_Ref, Tag}) -> Tag =:= access end, Results),
+ case length(AccessViolations) of
+ 0 ->
+ % we’re done here
+ {ok, DocErrors};
+ _ ->
+ % dig out FDIs from Docs matching our tags/refs
+ DocsDict = lists:foldl(fun(Doc, Dict) ->
+ Tag = doc_tag(Doc),
+ dict:store(Tag, Doc, Dict)
+ end, dict:new(), Docs),
+ AccessResults = lists:map(fun({Ref, Access}) ->
+ { dict:fetch(Ref, DocsDict), Access }
+ end, AccessViolations),
+ {ok, AccessResults}
+ end
+ end;
update_docs(Db, Docs0, Options, interactive_edit) ->
Docs = tag_docs(Docs0),
@@ -1272,7 +1415,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
MergeConflicts = lists:member(merge_conflicts, Options),
MRef = erlang:monitor(process, Pid),
try
- Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts},
+ Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, Ctx},
case collect_results_with_metrics(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry ->
@@ -1286,7 +1429,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
% We only retry once
DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
close(Db2),
- Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts},
+ Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, Ctx},
case collect_results_with_metrics(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
@@ -1475,6 +1618,11 @@ open_read_stream(Db, AttState) ->
is_active_stream(Db, StreamEngine) ->
couch_db_engine:is_active_stream(Db, StreamEngine).
+changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
+ case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of
+ true -> couch_mrview:query_changes_access(Db, StartSeq, Fun, Options, Acc);
+ false -> couch_db_engine:fold_changes(Db, StartSeq, Fun, Options, Acc)
+ end.
calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
Seq;
@@ -1594,8 +1742,10 @@ fold_changes(Db, StartSeq, UserFun, UserAcc) ->
fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
- couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
-
+ case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of
+ true -> couch_mrview:query_changes_access(Db, StartSeq, UserFun, Opts, UserAcc);
+ false -> couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts)
+ end.
fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
@@ -1616,7 +1766,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
- #full_doc_info{rev_tree=RevTree} ->
+ #full_doc_info{rev_tree=RevTree, access=Access} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
@@ -1636,7 +1786,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
% we have the rev in our list but know nothing about it
{{not_found, missing}, {Pos, Rev}};
#leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
- {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+ {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath, Access)}
end
end, FoundRevs),
Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
@@ -1652,21 +1802,27 @@ open_doc_revs_int(Db, IdRevs, Options) ->
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
case couch_db_engine:open_local_docs(Db, [Id]) of
[#doc{} = Doc] ->
- apply_open_options({ok, Doc}, Options);
+ case Doc#doc.body of
+ { Body } ->
+ Access = couch_util:get_value(<<"_access">>, Body),
+ apply_open_options(Db, {ok, Doc#doc{access = Access}}, Options);
+ _Else ->
+ apply_open_options(Db, {ok, Doc}, Options)
+ end;
[not_found] ->
{not_found, missing}
end;
-open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
+open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_],access=Access}=DocInfo, Options) ->
#rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
- Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
- apply_open_options(
+ Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}, Access),
+ apply_open_options(Db,
{ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}, Options);
-open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
+open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree,access=Access}=FullDocInfo, Options) ->
#doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
DocInfo = couch_doc:to_doc_info(FullDocInfo),
{[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
- Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
- apply_open_options(
+ Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath, Access),
+ apply_open_options(Db,
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
@@ -1716,22 +1872,28 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
+make_doc(Db, Id, Deleted, Bp, RevisionPath) ->
+ make_doc(Db, Id, Deleted, Bp, RevisionPath, []);
+make_doc(Db, Id, Deleted, Bp, {Pos, Revs}) ->
+ make_doc(Db, Id, Deleted, Bp, {Pos, Revs}, []).
-make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
+make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath, Access) ->
#doc{
id = Id,
revs = RevisionPath,
body = [],
atts = [],
- deleted = Deleted
+ deleted = Deleted,
+ access = Access
};
-make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
+make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}, Access) ->
RevsLimit = get_revs_limit(Db),
Doc0 = couch_db_engine:read_doc_body(Db, #doc{
id = Id,
revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
body = Bp,
- deleted = Deleted
+ deleted = Deleted,
+ access = Access
}),
Doc1 = case Doc0#doc.atts of
BinAtts when is_binary(BinAtts) ->
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 1ca804c05..41d36616a 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -22,7 +22,10 @@
-define(IDLE_LIMIT_DEFAULT, 61000).
-define(DEFAULT_MAX_PARTITION_SIZE, 16#280000000). % 10 GiB
-
+-define(DEFAULT_SECURITY_OBJECT, [
+ {<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
+ {<<"admins">>, {[{<<"roles">>,[<<"_admin">>]}]}}
+]).
-record(merge_acc, {
revs_limit,
@@ -37,7 +40,7 @@
init({Engine, DbName, FilePath, Options0}) ->
erlang:put(io_priority, {db_update, DbName}),
update_idle_limit_from_config(),
- DefaultSecObj = default_security_object(DbName),
+ DefaultSecObj = default_security_object(DbName, Options0),
Options = [{default_security_object, DefaultSecObj} | Options0],
try
{ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
@@ -162,7 +165,7 @@ handle_cast(Msg, #db{name = Name} = Db) ->
{stop, Msg, Db}.
-handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts},
+handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, UserCtx},
Db) ->
GroupedDocs2 = sort_and_tag_grouped_docs(Client, GroupedDocs),
if NonRepDocs == [] ->
@@ -173,7 +176,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts},
Clients = [Client]
end,
NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
- try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts) of
+ try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, UserCtx) of
{ok, Db2, UpdatedDDocIds} ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
@@ -248,7 +251,11 @@ sort_and_tag_grouped_docs(Client, GroupedDocs) ->
% The merge_updates function will fail and the database can end up with
% duplicate documents if the incoming groups are not sorted, so as a sanity
% check we sort them again here. See COUCHDB-2735.
- Cmp = fun([#doc{id=A}|_], [#doc{id=B}|_]) -> A < B end,
+ Cmp = fun
+ ([], []) -> false; % TODO: re-evaluate this addition, might be a
+ % superflous now
+ ([#doc{id=A}|_], [#doc{id=B}|_]) -> A < B
+ end,
lists:map(fun(DocGroup) ->
[{Client, maybe_tag_doc(D)} || D <- DocGroup]
end, lists:sort(Cmp, GroupedDocs)).
@@ -298,7 +305,7 @@ init_db(DbName, FilePath, EngineState, Options) ->
BDU = couch_util:get_value(before_doc_update, Options, nil),
ADR = couch_util:get_value(after_doc_read, Options, nil),
-
+ Access = couch_util:get_value(access, Options, false),
NonCreateOpts = [Opt || Opt <- Options, Opt /= create],
InitDb = #db{
@@ -308,7 +315,8 @@ init_db(DbName, FilePath, EngineState, Options) ->
instance_start_time = StartTime,
options = NonCreateOpts,
before_doc_update = BDU,
- after_doc_read = ADR
+ after_doc_read = ADR,
+ access = Access
},
DbProps = couch_db_engine:get_props(InitDb),
@@ -364,7 +372,8 @@ flush_trees(#db{} = Db,
active = WrittenSize,
external = ExternalSize
},
- atts = AttSizeInfo
+ atts = AttSizeInfo,
+ access = NewDoc#doc.access
},
{Leaf, add_sizes(Type, Leaf, SizesAcc)};
#leaf{} ->
@@ -439,6 +448,9 @@ doc_tag(#doc{meta=Meta}) ->
Else -> throw({invalid_doc_tag, Else})
end.
+merge_rev_trees([[]], [], Acc) ->
+ % validate_docs_access left us with no docs to merge
+ {ok, Acc};
merge_rev_trees([], [], Acc) ->
{ok, Acc#merge_acc{
add_infos = lists:reverse(Acc#merge_acc.add_infos)
@@ -611,19 +623,26 @@ maybe_stem_full_doc_info(#full_doc_info{rev_tree = Tree} = Info, Limit) ->
Info
end.
-update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
+update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, UserCtx) ->
UpdateSeq = couch_db_engine:get_update_seq(Db),
RevsLimit = couch_db_engine:get_revs_limit(Db),
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
+ % TODO: maybe a perf hit, instead of zip3-ing existin Accesses into
+ % our doc lists, maybe find 404 docs differently down in
+ % validate_docs_access (revs is [], which we can then use
+ % to skip validation as we know it is the first doc rev)
+ Accesses = [Access || [{_Client, #doc{access=Access}}|_] <- DocsList],
+
% lookup up the old documents, if they exist.
OldDocLookups = couch_db_engine:open_docs(Db, Ids),
- OldDocInfos = lists:zipwith(fun
- (_Id, #full_doc_info{} = FDI) ->
+
+ OldDocInfos = lists:zipwith3(fun
+ (_Id, #full_doc_info{} = FDI, _Access) ->
FDI;
- (Id, not_found) ->
- #full_doc_info{id=Id}
- end, Ids, OldDocLookups),
+ (Id, not_found, Access) ->
+ #full_doc_info{id=Id,access=Access}
+ end, Ids, OldDocLookups, Accesses),
%% Get the list of full partitions
FullPartitions = case couch_db:is_partitioned(Db) of
@@ -653,7 +672,14 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
cur_seq = UpdateSeq,
full_partitions = FullPartitions
},
- {ok, AccOut} = merge_rev_trees(DocsList, OldDocInfos, AccIn),
+ % Loop over DocsList, validate_access for each OldDocInfo on Db,
+ %. if no OldDocInfo, then send to DocsListValidated, keep OldDocsInfo
+ % if valid, then send to DocsListValidated, OldDocsInfo
+ %. if invalid, then send_result tagged `access`(c.f. `conflict)
+ %. and don’t add to DLV, nor ODI
+
+ { DocsListValidated, OldDocInfosValidated } = validate_docs_access(Db, UserCtx, DocsList, OldDocInfos),
+ {ok, AccOut} = merge_rev_trees(DocsListValidated, OldDocInfosValidated, AccIn),
#merge_acc{
add_infos = NewFullDocInfos,
rem_seqs = RemSeqs
@@ -663,7 +689,8 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
% the trees, the attachments are already written to disk)
{ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []),
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
- LocalDocs2 = update_local_doc_revs(LocalDocs),
+ LocalDocs1 = apply_local_docs_access(Db, LocalDocs),
+ LocalDocs2 = update_local_doc_revs(LocalDocs1),
{ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
@@ -676,15 +703,83 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
length(LocalDocs2)
),
- % Check if we just updated any design documents, and update the validation
- % funs if we did.
+ % Check if we just updated any non-access design documents,
+ % and update the validation funs if we did.
+ NonAccessIds = [Id || [{_Client, #doc{id=Id,access=[]}}|_] <- DocsList],
UpdatedDDocIds = lists:flatmap(fun
(<<"_design/", _/binary>> = Id) -> [Id];
(_) -> []
- end, Ids),
+ end, NonAccessIds),
{ok, commit_data(Db1), UpdatedDDocIds}.
+% check_access(Db, UserCtx, Access) ->
+% check_access(Db, UserCtx, couch_db:has_access_enabled(Db), Access).
+%
+% check_access(_Db, UserCtx, false, _Access) ->
+% true;
+
+% at this point, we already validated this Db is access enabled, so do the checks right away.
+check_access(Db, UserCtx, Access) -> couch_db:check_access(Db#db{user_ctx=UserCtx}, Access).
+
+% TODO: looks like we go into validation here unconditionally and only check in
+% check_access() whether the Db has_access_enabled(), we should do this
+% here on the outside. Might be our perf issue.
+% However, if it is, that means we have to speed this up as it would still
+% be too slow for when access is enabled.
+validate_docs_access(Db, UserCtx, DocsList, OldDocInfos) ->
+ case couch_db:has_access_enabled(Db) of
+ true -> validate_docs_access_int(Db, UserCtx, DocsList, OldDocInfos);
+ _Else -> { DocsList, OldDocInfos }
+ end.
+
+validate_docs_access_int(Db, UserCtx, DocsList, OldDocInfos) ->
+ validate_docs_access(Db, UserCtx, DocsList, OldDocInfos, [], []).
+
+validate_docs_access(_Db, UserCtx, [], [], DocsListValidated, OldDocInfosValidated) ->
+ { lists:reverse(DocsListValidated), lists:reverse(OldDocInfosValidated) };
+validate_docs_access(Db, UserCtx, [Docs | DocRest], [OldInfo | OldInfoRest], DocsListValidated, OldDocInfosValidated) ->
+ % loop over Docs as {Client, NewDoc}
+ % validate Doc
+ % if valid, then put back in Docs
+ % if not, then send_result and skip
+ NewDocs = lists:foldl(fun({ Client, Doc }, Acc) ->
+ % check if we are allowed to update the doc, skip when new doc
+ OldDocMatchesAccess = case OldInfo#full_doc_info.rev_tree of
+ [] -> true;
+ _ -> check_access(Db, UserCtx, OldInfo#full_doc_info.access)
+ end,
+
+ NewDocMatchesAccess = check_access(Db, UserCtx, Doc#doc.access),
+ case OldDocMatchesAccess andalso NewDocMatchesAccess of
+ true -> % if valid, then send to DocsListValidated, OldDocsInfo
+ % and store the access context on the new doc
+ [{Client, Doc} | Acc];
+ _Else2 -> % if invalid, then send_result tagged `access`(c.f. `conflict)
+ % and don’t add to DLV, nor ODI
+ send_result(Client, Doc, access),
+ Acc
+ end
+ end, [], Docs),
+
+ { NewDocsListValidated, NewOldDocInfosValidated } = case length(NewDocs) of
+ 0 -> % we sent out all docs as invalid access, drop the old doc info associated with it
+ { [NewDocs | DocsListValidated], OldDocInfosValidated };
+ _ ->
+ { [NewDocs | DocsListValidated], [OldInfo | OldDocInfosValidated] }
+ end,
+ validate_docs_access(Db, UserCtx, DocRest, OldInfoRest, NewDocsListValidated, NewOldDocInfosValidated).
+
+apply_local_docs_access(Db, Docs) ->
+ apply_local_docs_access1(couch_db:has_access_enabled(Db), Docs).
+
+apply_local_docs_access1(false, Docs) ->
+ Docs;
+apply_local_docs_access1(true, Docs) ->
+ lists:map(fun({Client, #doc{access = Access, body = {Body}} = Doc}) ->
+ Doc1 = Doc#doc{body = {[{<<"_access">>, Access} | Body]}},
+ {Client, Doc1}
+ end, Docs).
update_local_doc_revs(Docs) ->
lists:foldl(fun({Client, Doc}, Acc) ->
@@ -850,19 +945,23 @@ get_meta_body_size(Meta) ->
ExternalSize.
+default_security_object(DbName, []) ->
+ default_security_object(DbName);
+default_security_object(DbName, Options) ->
+ case lists:member({access, true}, Options) of
+ false -> default_security_object(DbName);
+ true -> ?DEFAULT_SECURITY_OBJECT
+ end.
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "admin_only") of
- "admin_only" ->
- [{<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
- {<<"admins">>,{[{<<"roles">>,[<<"_admin">>]}]}}];
+ "admin_only" -> ?DEFAULT_SECURITY_OBJECT;
Everyone when Everyone == "everyone"; Everyone == "admin_local" ->
[]
end;
default_security_object(_DbName) ->
case config:get("couchdb", "default_security", "admin_only") of
Admin when Admin == "admin_only"; Admin == "admin_local" ->
- [{<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
- {<<"admins">>,{[{<<"roles">>,[<<"_admin">>]}]}}];
+ ?DEFAULT_SECURITY_OBJECT;
"everyone" ->
[]
end.