diff options
Diffstat (limited to 'src/fabric/src/fabric2_db.erl')
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 2363 |
1 files changed, 2363 insertions, 0 deletions
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl new file mode 100644 index 000000000..b3e510b2e --- /dev/null +++ b/src/fabric/src/fabric2_db.erl @@ -0,0 +1,2363 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_db). + + +-export([ + create/2, + open/2, + delete/2, + undelete/4, + + list_dbs/0, + list_dbs/1, + list_dbs/3, + + list_dbs_info/0, + list_dbs_info/1, + list_dbs_info/3, + + list_deleted_dbs_info/0, + list_deleted_dbs_info/1, + list_deleted_dbs_info/3, + + check_is_admin/1, + check_is_member/1, + + name/1, + get_after_doc_read_fun/1, + get_before_doc_update_fun/1, + get_committed_update_seq/1, + get_compacted_seq/1, + get_compactor_pid/1, + get_db_info/1, + %% get_partition_info/2, + get_del_doc_count/1, + get_doc_count/1, + get_doc_count/2, + %% get_epochs/1, + %% get_filepath/1, + get_instance_start_time/1, + get_pid/1, + get_revs_limit/1, + get_revs_limit/2, + get_security/1, + get_security/2, + get_update_seq/1, + get_user_ctx/1, + get_uuid/1, + %% get_purge_seq/1, + %% get_oldest_purge_seq/1, + %% get_purge_infos_limit/1, + + is_clustered/1, + is_db/1, + is_partitioned/1, + is_system_db/1, + is_system_db_name/1, + is_replicator_db/1, + is_users_db/1, + + set_revs_limit/2, + %% set_purge_infos_limit/2, + set_security/2, + set_user_ctx/2, + + ensure_full_commit/1, + ensure_full_commit/2, + + %% load_validation_funs/1, + %% reload_validation_funs/1, + + open_doc/2, + open_doc/3, + open_doc_revs/4, + %% open_doc_int/3, + get_doc_info/2, + get_full_doc_info/2, + get_full_doc_infos/2, + get_missing_revs/2, + get_design_docs/1, + %% get_purge_infos/2, + + %% get_minimum_purge_seq/1, + %% purge_client_exists/3, + + validate_docid/1, + %% doc_from_json_obj_validate/2, + + update_doc/2, + update_doc/3, + update_docs/2, + update_docs/3, + %% delete_doc/3, + + %% purge_docs/2, + %% purge_docs/3, + + read_attachment/3, + write_attachment/3, + + fold_docs/3, + fold_docs/4, + fold_docs/5, + fold_design_docs/4, + fold_local_docs/4, + fold_changes/4, + fold_changes/5, + %% count_changes_since/2, + %% fold_purge_infos/4, + %% fold_purge_infos/5, + + %% calculate_start_seq/3, + %% owner_of/2, + + %% start_compact/1, + %% cancel_compact/1, + %% wait_for_compaction/1, + %% wait_for_compaction/2, + + dbname_suffix/1, + normalize_dbname/1, + validate_dbname/1, + + %% make_doc/5, + new_revid/2, + + apply_open_doc_opts/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("fabric2.hrl"). + + +% Default max database name length is based on CouchDb < 4.x compatibility. See +% default.ini entry for additional information. +-define(DEFAULT_MAX_DATABASE_NAME_LENGTH, 238). + +-define(DBNAME_REGEX, + "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex + "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end +). + +-define(FIRST_DDOC_KEY, <<"_design/">>). +-define(LAST_DDOC_KEY, <<"_design0">>). + +-define(RETURN(Term), throw({?MODULE, Term})). + +-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 2500000). + + +-record(bacc, { + db, + docs, + batch_size, + options, + rev_futures, + seen, + results +}). + + +create(DbName, Options) -> + case validate_dbname(DbName) of + ok -> + Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) -> + case fabric2_fdb:exists(TxDb) of + true -> + {error, file_exists}; + false -> + fabric2_fdb:create(TxDb, Options) + end + end), + % We cache outside of the transaction so that we're sure + % that the transaction was committed. + case Result of + #{} = Db0 -> + Db1 = maybe_add_sys_db_callbacks(Db0), + ok = fabric2_server:store(Db1), + fabric2_db_plugin:after_db_create(DbName, get_uuid(Db1)), + {ok, Db1#{tx := undefined}}; + Error -> + Error + end; + Error -> + Error + end. + + +open(DbName, Options) -> + UUID = fabric2_util:get_value(uuid, Options), + case fabric2_server:fetch(DbName, UUID) of + #{} = Db -> + Db1 = maybe_set_user_ctx(Db, Options), + Db2 = maybe_set_interactive(Db1, Options), + {ok, require_member_check(Db2)}; + undefined -> + Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) -> + fabric2_fdb:open(TxDb, Options) + end), + % Cache outside the transaction retry loop + case Result of + #{} = Db0 -> + Db1 = maybe_add_sys_db_callbacks(Db0), + ok = fabric2_server:store(Db1), + Db2 = Db1#{tx := undefined}, + {ok, require_member_check(Db2)}; + Error -> + Error + end + end. + + +delete(DbName, Options) -> + % Delete doesn't check user_ctx, that's done at the HTTP API level + % here we just care to get the `database_does_not_exist` error thrown + Options1 = lists:keystore(user_ctx, 1, Options, ?ADMIN_CTX), + case lists:keyfind(deleted_at, 1, Options1) of + {deleted_at, TimeStamp} -> + fabric2_fdb:transactional(DbName, Options1, fun(TxDb) -> + fabric2_fdb:remove_deleted_db(TxDb, TimeStamp) + end); + false -> + {ok, Db} = open(DbName, Options1), + Resp = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:delete(TxDb) + end), + if Resp /= ok -> Resp; true -> + fabric2_db_plugin:after_db_delete(DbName, get_uuid(Db)), + fabric2_server:remove(DbName) + end + end. + + +undelete(DbName, TgtDbName, TimeStamp, Options) -> + case validate_dbname(TgtDbName) of + ok -> + Resp = fabric2_fdb:transactional(DbName, Options, + fun(TxDb) -> + fabric2_fdb:undelete(TxDb, TgtDbName, TimeStamp) + end + ), + if Resp /= ok -> ok; true -> + {ok, Db} = open(TgtDbName, Options), + fabric2_db_plugin:after_db_create(TgtDbName, get_uuid(Db)) + end, + Resp; + Error -> + Error + end. + + +list_dbs() -> + list_dbs([]). + + +list_dbs(Options) -> + Callback = fun(DbName, Acc) -> [DbName | Acc] end, + DbNames = fabric2_fdb:transactional(fun(Tx) -> + fabric2_fdb:list_dbs(Tx, Callback, [], Options) + end), + lists:reverse(DbNames). + + +list_dbs(UserFun, UserAcc0, Options) -> + FoldFun = fun + (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc)) + end, + fabric2_fdb:transactional(fun(Tx) -> + try + UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)), + UserAcc2 = fabric2_fdb:list_dbs( + Tx, + FoldFun, + UserAcc1, + Options + ), + {ok, maybe_stop(UserFun(complete, UserAcc2))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end + end). + + +list_dbs_info() -> + list_dbs_info([]). + + +list_dbs_info(Options) -> + Callback = fun(Value, Acc) -> + NewAcc = case Value of + {meta, _} -> Acc; + {row, DbInfo} -> [DbInfo | Acc]; + complete -> Acc + end, + {ok, NewAcc} + end, + {ok, DbInfos} = list_dbs_info(Callback, [], Options), + {ok, lists:reverse(DbInfos)}. + + +list_dbs_info(UserFun, UserAcc0, Options) -> + FoldFun = fun(DbName, InfoFuture, {FutureQ, Count, Acc}) -> + NewFutureQ = queue:in({DbName, InfoFuture}, FutureQ), + drain_info_futures(NewFutureQ, Count + 1, UserFun, Acc) + end, + fabric2_fdb:transactional(fun(Tx) -> + try + UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)), + InitAcc = {queue:new(), 0, UserAcc1}, + {FinalFutureQ, _, UserAcc2} = fabric2_fdb:list_dbs_info( + Tx, + FoldFun, + InitAcc, + Options + ), + UserAcc3 = drain_all_info_futures(FinalFutureQ, UserFun, UserAcc2), + {ok, maybe_stop(UserFun(complete, UserAcc3))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end + end). + + +list_deleted_dbs_info() -> + list_deleted_dbs_info([]). + + +list_deleted_dbs_info(Options) -> + Callback = fun(Value, Acc) -> + NewAcc = case Value of + {meta, _} -> Acc; + {row, DbInfo} -> [DbInfo | Acc]; + complete -> Acc + end, + {ok, NewAcc} + end, + {ok, DbInfos} = list_deleted_dbs_info(Callback, [], Options), + {ok, lists:reverse(DbInfos)}. + + +list_deleted_dbs_info(UserFun, UserAcc0, Options0) -> + Dir = fabric2_util:get_value(dir, Options0, fwd), + StartKey0 = fabric2_util:get_value(start_key, Options0), + EndKey0 = fabric2_util:get_value(end_key, Options0), + + {FirstBinary, LastBinary} = case Dir of + fwd -> {<<>>, <<255>>}; + rev -> {<<255>>, <<>>} + end, + + StartKey1 = case StartKey0 of + undefined -> + {FirstBinary}; + DbName0 when is_binary(DbName0) -> + {DbName0, FirstBinary}; + [DbName0, TimeStamp0] when is_binary(DbName0), is_binary(TimeStamp0) -> + {DbName0, TimeStamp0}; + BadStartKey -> + erlang:error({invalid_start_key, BadStartKey}) + end, + EndKey1 = case EndKey0 of + undefined -> + {LastBinary}; + DbName1 when is_binary(DbName1) -> + {DbName1, LastBinary}; + [DbName1, TimeStamp1] when is_binary(DbName1), is_binary(TimeStamp1) -> + {DbName1, TimeStamp1}; + BadEndKey -> + erlang:error({invalid_end_key, BadEndKey}) + end, + + Options1 = Options0 -- [{start_key, StartKey0}, {end_key, EndKey0}], + Options2 = [ + {start_key, StartKey1}, + {end_key, EndKey1}, + {wrap_keys, false} + ] ++ Options1, + + FoldFun = fun(DbName, TimeStamp, InfoFuture, {FutureQ, Count, Acc}) -> + NewFutureQ = queue:in({DbName, TimeStamp, InfoFuture}, FutureQ), + drain_deleted_info_futures(NewFutureQ, Count + 1, UserFun, Acc) + end, + fabric2_fdb:transactional(fun(Tx) -> + try + UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)), + InitAcc = {queue:new(), 0, UserAcc1}, + {FinalFutureQ, _, UserAcc2} = fabric2_fdb:list_deleted_dbs_info( + Tx, + FoldFun, + InitAcc, + Options2 + ), + UserAcc3 = drain_all_deleted_info_futures( + FinalFutureQ, + UserFun, + UserAcc2 + ), + {ok, maybe_stop(UserFun(complete, UserAcc3))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end + end). + + +is_admin(Db, {SecProps}) when is_list(SecProps) -> + case fabric2_db_plugin:check_is_admin(Db) of + true -> + true; + false -> + UserCtx = get_user_ctx(Db), + {Admins} = get_admins(SecProps), + is_authorized(Admins, UserCtx) + end. + + +check_is_admin(Db) -> + check_is_admin(Db, get_security(Db)). + + +check_is_admin(Db, SecDoc) -> + case is_admin(Db, SecDoc) of + true -> + ok; + false -> + UserCtx = get_user_ctx(Db), + Reason = <<"You are not a db or server admin.">>, + throw_security_error(UserCtx, Reason) + end. + + +check_is_member(Db) -> + check_is_member(Db, get_security(Db)). + + +check_is_member(Db, SecDoc) -> + case is_member(Db, SecDoc) of + true -> + ok; + false -> + UserCtx = get_user_ctx(Db), + throw_security_error(UserCtx) + end. + + +require_admin_check(#{} = Db) -> + Db#{security_fun := fun check_is_admin/2}. + + +require_member_check(#{} = Db) -> + Db#{security_fun := fun check_is_member/2}. + + +name(#{name := DbName}) -> + DbName. + + +get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) -> + AfterDocRead. + + +get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) -> + BeforeDocUpdate. + +get_committed_update_seq(#{} = Db) -> + get_update_seq(Db). + + +get_compacted_seq(#{} = Db) -> + get_update_seq(Db). + + +get_compactor_pid(#{} = _Db) -> + nil. + + +get_db_info(#{} = Db) -> + DbProps = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_info(TxDb) + end), + {ok, make_db_info(name(Db), DbProps)}. + + +get_del_doc_count(#{} = Db) -> + get_doc_count(Db, <<"doc_del_count">>). + + +get_doc_count(Db) -> + get_doc_count(Db, <<"doc_count">>). + + +get_doc_count(Db, undefined) -> + get_doc_count(Db, <<"doc_count">>); + +get_doc_count(Db, <<"_all_docs">>) -> + get_doc_count(Db, <<"doc_count">>); + +get_doc_count(DbName, <<"_design">>) -> + get_doc_count(DbName, <<"doc_design_count">>); + +get_doc_count(DbName, <<"_local">>) -> + get_doc_count(DbName, <<"doc_local_count">>); + +get_doc_count(Db, Key) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_stat(TxDb, Key) + end). + + +get_instance_start_time(#{}) -> + 0. + + +get_pid(#{}) -> + nil. + + +get_revs_limit(#{} = Db) -> + get_revs_limit(Db, []). + + +get_revs_limit(#{} = Db, Opts) -> + CurrentDb = get_cached_db(Db, Opts), + maps:get(revs_limit, CurrentDb). + + +get_security(#{} = Db) -> + get_security(Db, []). + + +get_security(#{} = Db, Opts) -> + CurrentDb = get_cached_db(Db, Opts), + maps:get(security_doc, CurrentDb). + + +get_update_seq(#{} = Db) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_last_change(TxDb) + end). + + +get_user_ctx(#{user_ctx := UserCtx}) -> + UserCtx. + + +get_uuid(#{uuid := UUID}) -> + UUID. + + +is_clustered(#{}) -> + false. + + +is_db(#{name := _}) -> + true; +is_db(_) -> + false. + + +is_partitioned(#{}) -> + false. + + +is_system_db(#{name := DbName}) -> + is_system_db_name(DbName). + + +is_system_db_name(DbName) when is_list(DbName) -> + is_system_db_name(?l2b(DbName)); +is_system_db_name(DbName) when is_binary(DbName) -> + Suffix = filename:basename(DbName), + case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of + {<<".">>, Result} -> Result; + {_Prefix, false} -> false; + {Prefix, true} -> + ReOpts = [{capture,none}, dollar_endonly], + re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match + end. + + +is_replicator_db(#{name := DbName}) -> + is_replicator_db(DbName); + +is_replicator_db(DbName) when is_binary(DbName) -> + fabric2_util:dbname_ends_with(DbName, <<"_replicator">>). + + +is_users_db(#{name := DbName}) -> + is_users_db(DbName); + +is_users_db(DbName) when is_binary(DbName) -> + AuthenticationDb = config:get("chttpd_auth", "authentication_db"), + CfgUsersSuffix = config:get("couchdb", "users_db_suffix", "_users"), + + IsAuthCache = if AuthenticationDb == undefined -> false; true -> + DbName == ?l2b(AuthenticationDb) + end, + IsCfgUsersDb = fabric2_util:dbname_ends_with(DbName, ?l2b(CfgUsersSuffix)), + IsGlobalUsersDb = fabric2_util:dbname_ends_with(DbName, <<"_users">>), + + IsAuthCache orelse IsCfgUsersDb orelse IsGlobalUsersDb. + + +set_revs_limit(#{} = Db0, RevsLimit) when is_integer(RevsLimit) -> + Db1 = require_admin_check(Db0), + Resp = fabric2_fdb:transactional(Db1, fun(TxDb) -> + fabric2_fdb:set_config(TxDb, revs_limit, RevsLimit) + end), + case Resp of + {ok, #{} = Db2} -> fabric2_server:store(Db2); + Err -> Err + end. + + +set_security(#{} = Db0, Security) -> + Db1 = require_admin_check(Db0), + ok = fabric2_util:validate_security_object(Security), + Resp = fabric2_fdb:transactional(Db1, fun(TxDb) -> + fabric2_fdb:set_config(TxDb, security_doc, Security) + end), + case Resp of + {ok, #{} = Db2} -> fabric2_server:store(Db2); + Err -> Err + end. + + +set_user_ctx(#{} = Db, UserCtx) -> + Db#{user_ctx := UserCtx}. + + +ensure_full_commit(#{}) -> + {ok, 0}. + + +ensure_full_commit(#{}, _Timeout) -> + {ok, 0}. + + +open_doc(#{} = Db, DocId) -> + open_doc(Db, DocId, []). + + +open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + case fabric2_fdb:get_local_doc(TxDb, DocId) of + #doc{} = Doc -> {ok, Doc}; + Else -> Else + end + end); + +open_doc(#{} = Db, DocId, Options) -> + NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts], + NeedsTree = (Options -- NeedsTreeOpts /= Options), + OpenDeleted = lists:member(deleted, Options), + fabric2_fdb:transactional(Db, fun(TxDb) -> + Revs = case NeedsTree of + true -> fabric2_fdb:get_all_revs(TxDb, DocId); + false -> fabric2_fdb:get_winning_revs(TxDb, DocId, 1) + end, + if Revs == [] -> {not_found, missing}; true -> + #{winner := true} = RI = lists:last(Revs), + case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of + #doc{deleted = true} when not OpenDeleted -> + {not_found, deleted}; + #doc{} = Doc -> + apply_open_doc_opts(Doc, Revs, Options); + Else -> + Else + end + end + end). + + +open_doc_revs(Db, DocId, Revs, Options) -> + Latest = lists:member(latest, Options), + fabric2_fdb:transactional(Db, fun(TxDb) -> + AllRevInfos = fabric2_fdb:get_all_revs(TxDb, DocId), + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], AllRevInfos), + {Found, Missing} = case Revs of + all -> + {couch_key_tree:get_all_leafs(RevTree), []}; + _ when Latest -> + couch_key_tree:get_key_leafs(RevTree, Revs); + _ -> + couch_key_tree:get(RevTree, Revs) + end, + Docs = lists:map(fun({Value, {Pos, [Rev | RevPath]}}) -> + case Value of + ?REV_MISSING -> + % We have the rev in our list but know nothing about it + {{not_found, missing}, {Pos, Rev}}; + _ -> + RevInfo = #{ + rev_id => {Pos, Rev}, + rev_path => RevPath + }, + case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of + #doc{} = Doc -> + apply_open_doc_opts(Doc, AllRevInfos, Options); + Else -> + {Else, {Pos, Rev}} + end + end + end, Found), + MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing], + {ok, Docs ++ MissingDocs} + end). + + +get_doc_info(Db, DocId) -> + case get_full_doc_info(Db, DocId) of + not_found -> not_found; + FDI -> couch_doc:to_doc_info(FDI) + end. + + +get_full_doc_info(Db, DocId) -> + RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:get_all_revs(TxDb, DocId) + end), + if RevInfos == [] -> not_found; true -> + #{winner := true} = Winner = lists:last(RevInfos), + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], RevInfos), + #full_doc_info{ + id = DocId, + update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)), + deleted = maps:get(deleted, Winner), + rev_tree = RevTree + } + end. + + +get_full_doc_infos(Db, DocIds) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + lists:map(fun(DocId) -> + get_full_doc_info(TxDb, DocId) + end, DocIds) + end). + + +get_missing_revs(Db, JsonIdRevs) -> + IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs], + AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) -> + lists:foldl(fun({Id, _Revs}, Acc) -> + case maps:is_key(Id, Acc) of + true -> + Acc; + false -> + RevInfos = fabric2_fdb:get_all_revs(TxDb, Id), + Acc#{Id => RevInfos} + end + end, #{}, IdRevs) + end), + AllMissing = lists:flatmap(fun({Id, Revs}) -> + #{Id := RevInfos} = AllRevInfos, + Missing = try + lists:foldl(fun(RevInfo, RevAcc) -> + if RevAcc /= [] -> ok; true -> + throw(all_found) + end, + filter_found_revs(RevInfo, RevAcc) + end, Revs, RevInfos) + catch throw:all_found -> + [] + end, + if Missing == [] -> []; true -> + PossibleAncestors = find_possible_ancestors(RevInfos, Missing), + [{Id, Missing, PossibleAncestors}] + end + end, IdRevs), + {ok, AllMissing}. + + +get_design_docs(Db) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + #{ + db_prefix := DbPrefix + } = TxDb, + + Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix), + Options = set_design_doc_keys([]), + FoldFun = fun({Key, Val}, Acc) -> + {DocId} = erlfdb_tuple:unpack(Key, Prefix), + RevId = erlfdb_tuple:unpack(Val), + Rev = #{ + rev_id => RevId, + rev_path => [] + }, + Future = fabric2_fdb:get_doc_body_future(TxDb, DocId, Rev), + [{DocId, Rev, Future} | Acc] + end, + Futures = fabric2_fdb:fold_range(TxDb, Prefix, FoldFun, [], Options), + + % Using foldl instead of map means that the design + % docs come out in sorted order. + lists:foldl(fun({DocId, Rev, Future}, Acc) -> + [fabric2_fdb:get_doc_body_wait(TxDb, DocId, Rev, Future) | Acc] + end, [], Futures) + end). + + +validate_docid(<<"">>) -> + throw({illegal_docid, <<"Document id must not be empty">>}); +validate_docid(<<"_design/">>) -> + throw({illegal_docid, <<"Illegal document id `_design/`">>}); +validate_docid(<<"_local/">>) -> + throw({illegal_docid, <<"Illegal document id `_local/`">>}); +validate_docid(Id) when is_binary(Id) -> + MaxLen = case config:get("couchdb", "max_document_id_length", "infinity") of + "infinity" -> infinity; + IntegerVal -> list_to_integer(IntegerVal) + end, + case MaxLen > 0 andalso byte_size(Id) > MaxLen of + true -> throw({illegal_docid, <<"Document id is too long">>}); + false -> ok + end, + case couch_util:validate_utf8(Id) of + false -> throw({illegal_docid, <<"Document id must be valid UTF-8">>}); + true -> ok + end, + case Id of + <<?DESIGN_DOC_PREFIX, _/binary>> -> ok; + <<?LOCAL_DOC_PREFIX, _/binary>> -> ok; + <<"_", _/binary>> -> + case fabric2_db_plugin:validate_docid(Id) of + true -> + ok; + false -> + throw( + {illegal_docid, + <<"Only reserved document ids may start with underscore.">>}) + end; + _Else -> ok + end; +validate_docid(Id) -> + couch_log:debug("Document id is not a string: ~p", [Id]), + throw({illegal_docid, <<"Document id must be a string">>}). + + +update_doc(Db, Doc) -> + update_doc(Db, Doc, []). + + +update_doc(Db, Doc, Options) -> + case update_docs(Db, [Doc], Options) of + {ok, [{ok, NewRev}]} -> + {ok, NewRev}; + {ok, [{{_Id, _Rev}, Error}]} -> + throw(Error); + {error, [{{_Id, _Rev}, Error}]} -> + throw(Error); + {error, [Error]} -> + throw(Error); + {ok, []} -> + % replication success + {Pos, [RevId | _]} = Doc#doc.revs, + {ok, {Pos, RevId}} + end. + + +update_docs(Db, Docs) -> + update_docs(Db, Docs, []). + + +update_docs(Db, Docs0, Options) -> + Docs1 = apply_before_doc_update(Db, Docs0, Options), + try + validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)), + + Resps0 = batch_update_docs(Db, Docs1, Options), + + % Notify index builder + fabric2_index:db_updated(name(Db)), + + % Convert errors + Resps1 = lists:map(fun(Resp) -> + case Resp of + {#doc{} = Doc, Error} -> + #doc{ + id = DocId, + revs = Revs + } = Doc, + RevId = case Revs of + {RevPos, [Rev | _]} -> {RevPos, Rev}; + {0, []} -> {0, <<>>}; + Else -> Else + end, + {{DocId, RevId}, Error}; + Else -> + Else + end + end, Resps0), + case is_replicated(Options) of + true -> + {ok, lists:flatmap(fun(R) -> + case R of + {ok, []} -> []; + {{_, _}, {ok, []}} -> []; + Else -> [Else] + end + end, Resps1)}; + false -> + {ok, Resps1} + end + catch throw:{aborted, Errors} -> + {aborted, Errors} + end. + + +read_attachment(Db, DocId, AttId) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:read_attachment(TxDb, DocId, AttId) + end). + + +write_attachment(Db, DocId, Att) -> + Data = couch_att:fetch(data, Att), + Encoding = couch_att:fetch(encoding, Att), + {ok, AttId} = fabric2_fdb:write_attachment(Db, DocId, Data, Encoding), + couch_att:store(data, {loc, Db, DocId, AttId}, Att). + + +fold_docs(Db, UserFun, UserAcc) -> + fold_docs(Db, UserFun, UserAcc, []). + + +fold_docs(Db, UserFun, UserAcc0, Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + try + #{ + db_prefix := DbPrefix + } = TxDb, + + Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix), + Meta = get_all_docs_meta(TxDb, Options), + + UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)), + + UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) -> + {DocId} = erlfdb_tuple:unpack(K, Prefix), + RevId = erlfdb_tuple:unpack(V), + Row0 = [ + {id, DocId}, + {key, DocId}, + {value, {[{rev, couch_doc:rev_to_str(RevId)}]}} + ], + + DocOpts = couch_util:get_value(doc_opts, Options, []), + OpenOpts = [deleted | DocOpts], + + Row1 = case lists:keyfind(include_docs, 1, Options) of + {include_docs, true} -> + Row0 ++ open_json_doc(TxDb, DocId, OpenOpts, DocOpts); + _ -> + Row0 + end, + + maybe_stop(UserFun({row, Row1}, Acc)) + end, UserAcc1, Options), + + {ok, maybe_stop(UserFun(complete, UserAcc2))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end + end). + + +fold_docs(Db, DocIds, UserFun, UserAcc0, Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + try + NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts], + NeedsTree = (Options -- NeedsTreeOpts /= Options), + + InitAcc = #{ + revs_q => queue:new(), + revs_count => 0, + body_q => queue:new(), + body_count => 0, + doc_opts => Options, + user_acc => UserAcc0, + user_fun => UserFun + }, + + FinalAcc1 = lists:foldl(fun(DocId, Acc) -> + #{ + revs_q := RevsQ, + revs_count := RevsCount + } = Acc, + Future = fold_docs_get_revs(TxDb, DocId, NeedsTree), + NewAcc = Acc#{ + revs_q := queue:in({DocId, Future}, RevsQ), + revs_count := RevsCount + 1 + }, + drain_fold_docs_revs_futures(TxDb, NewAcc) + end, InitAcc, DocIds), + + FinalAcc2 = drain_all_fold_docs_revs_futures(TxDb, FinalAcc1), + FinalAcc3 = drain_all_fold_docs_body_futures(TxDb, FinalAcc2), + + #{ + user_acc := FinalUserAcc + } = FinalAcc3, + {ok, FinalUserAcc} + + catch throw:{stop, StopUserAcc} -> + {ok, StopUserAcc} + end + end). + + + + +fold_design_docs(Db, UserFun, UserAcc0, Options1) -> + Options2 = set_design_doc_keys(Options1), + fold_docs(Db, UserFun, UserAcc0, Options2). + + +fold_local_docs(Db, UserFun, UserAcc0, Options0) -> + % This is mostly for testing and sanity checking. When calling from a test + % namespace will be automatically set. We also assert when called from the + % API the correct namespace was set + Options = case lists:keyfind(namespace, 1, Options0) of + {namespace, <<"_local">>} -> Options0; + false -> [{namespace, <<"_local">>} | Options0] + end, + fabric2_fdb:transactional(Db, fun(TxDb) -> + try + #{ + db_prefix := DbPrefix + } = TxDb, + + Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOCS}, DbPrefix), + Meta = get_all_docs_meta(TxDb, Options), + + UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)), + + UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) -> + {DocId} = erlfdb_tuple:unpack(K, Prefix), + Rev = fabric2_fdb:get_local_doc_rev(TxDb, DocId, V), + maybe_stop(UserFun({row, [ + {id, DocId}, + {key, DocId}, + {value, {[{rev, couch_doc:rev_to_str({0, Rev})}]}} + ]}, Acc)) + end, UserAcc1, Options), + + {ok, maybe_stop(UserFun(complete, UserAcc2))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end + end). + + +fold_changes(Db, SinceSeq, UserFun, UserAcc) -> + fold_changes(Db, SinceSeq, UserFun, UserAcc, []). + + +fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + try + #{ + db_prefix := DbPrefix + } = TxDb, + + Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix), + + Dir = case fabric2_util:get_value(dir, Options, fwd) of + rev -> rev; + _ -> fwd + end, + + RestartTx = case fabric2_util:get_value(restart_tx, Options) of + undefined -> [{restart_tx, true}]; + _AlreadySet -> [] + end, + + StartKey = get_since_seq(TxDb, Dir, SinceSeq), + EndKey = case Dir of + rev -> fabric2_util:seq_zero_vs(); + _ -> fabric2_util:seq_max_vs() + end, + FoldOpts = [ + {start_key, StartKey}, + {end_key, EndKey} + ] ++ RestartTx ++ Options, + + {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) -> + {SeqVS} = erlfdb_tuple:unpack(K, Prefix), + {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V), + + Change = #{ + id => DocId, + sequence => fabric2_fdb:vs_to_seq(SeqVS), + rev_id => RevId, + deleted => Deleted + }, + + maybe_stop(UserFun(Change, Acc)) + end, UserAcc, FoldOpts)} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end + end). + + +dbname_suffix(DbName) -> + filename:basename(normalize_dbname(DbName)). + + +normalize_dbname(DbName) -> + % Remove in the final cleanup. We don't need to handle shards prefix or + % remove .couch suffixes anymore. Keep it for now to pass all the existing + % tests. + couch_db:normalize_dbname(DbName). + + +validate_dbname(DbName) when is_list(DbName) -> + validate_dbname(?l2b(DbName)); + +validate_dbname(DbName) when is_binary(DbName) -> + Normalized = normalize_dbname(DbName), + fabric2_db_plugin:validate_dbname( + DbName, Normalized, fun validate_dbname_int/2). + +validate_dbname_int(DbName, Normalized) when is_binary(DbName) -> + case validate_dbname_length(DbName) of + ok -> validate_dbname_pat(DbName, Normalized); + {error, _} = Error -> Error + end. + + +validate_dbname_length(DbName) -> + MaxLength = config:get_integer("couchdb", "max_database_name_length", + ?DEFAULT_MAX_DATABASE_NAME_LENGTH), + case byte_size(DbName) =< MaxLength of + true -> ok; + false -> {error, {database_name_too_long, DbName}} + end. + + +validate_dbname_pat(DbName, Normalized) -> + DbNoExt = couch_util:drop_dot_couch_ext(DbName), + case re:run(DbNoExt, ?DBNAME_REGEX, [{capture,none}, dollar_endonly]) of + match -> + ok; + nomatch -> + case is_system_db_name(Normalized) of + true -> ok; + false -> {error, {illegal_database_name, DbName}} + end + end. + + +maybe_add_sys_db_callbacks(Db) -> + IsReplicatorDb = is_replicator_db(Db), + IsUsersDb = is_users_db(Db), + + {BDU, ADR} = if + IsReplicatorDb -> + { + fun couch_replicator_docs:before_doc_update/3, + fun couch_replicator_docs:after_doc_read/2 + }; + IsUsersDb -> + { + fun fabric2_users_db:before_doc_update/3, + fun fabric2_users_db:after_doc_read/2 + }; + true -> + {undefined, undefined} + end, + + Db#{ + before_doc_update := BDU, + after_doc_read := ADR + }. + + +make_db_info(DbName, Props) -> + BaseProps = [ + {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}}, + {compact_running, false}, + {data_size, 0}, + {db_name, DbName}, + {disk_format_version, 0}, + {disk_size, 0}, + {instance_start_time, <<"0">>}, + {purge_seq, 0} + ], + + lists:foldl(fun({Key, Val}, Acc) -> + lists:keystore(Key, 1, Acc, {Key, Val}) + end, BaseProps, Props). + + +drain_info_futures(FutureQ, Count, _UserFun, Acc) when Count < 100 -> + {FutureQ, Count, Acc}; + +drain_info_futures(FutureQ, Count, UserFun, Acc) when Count >= 100 -> + {{value, {DbName, Future}}, RestQ} = queue:out(FutureQ), + InfoProps = fabric2_fdb:get_info_wait(Future), + DbInfo = make_db_info(DbName, InfoProps), + NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)), + {RestQ, Count - 1, NewAcc}. + + +drain_all_info_futures(FutureQ, UserFun, Acc) -> + case queue:out(FutureQ) of + {{value, {DbName, Future}}, RestQ} -> + InfoProps = fabric2_fdb:get_info_wait(Future), + DbInfo = make_db_info(DbName, InfoProps), + NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)), + drain_all_info_futures(RestQ, UserFun, NewAcc); + {empty, _} -> + Acc + end. + + +drain_deleted_info_futures(FutureQ, Count, _UserFun, Acc) when Count < 100 -> + {FutureQ, Count, Acc}; + +drain_deleted_info_futures(FutureQ, Count, UserFun, Acc) when Count >= 100 -> + {{value, {DbName, TimeStamp, Future}}, RestQ} = queue:out(FutureQ), + BaseProps = fabric2_fdb:get_info_wait(Future), + DeletedProps = BaseProps ++ [ + {deleted, true}, + {timestamp, TimeStamp} + ], + DbInfo = make_db_info(DbName, DeletedProps), + NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)), + {RestQ, Count - 1, NewAcc}. + + +drain_all_deleted_info_futures(FutureQ, UserFun, Acc) -> + case queue:out(FutureQ) of + {{value, {DbName, TimeStamp, Future}}, RestQ} -> + BaseProps = fabric2_fdb:get_info_wait(Future), + DeletedProps = BaseProps ++ [ + {deleted, true}, + {timestamp, TimeStamp} + ], + DbInfo = make_db_info(DbName, DeletedProps), + NewAcc = maybe_stop(UserFun({row, DbInfo}, Acc)), + drain_all_deleted_info_futures(RestQ, UserFun, NewAcc); + {empty, _} -> + Acc + end. + + +fold_docs_get_revs(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _) -> + fabric2_fdb:get_local_doc_rev_future(Db, DocId); + +fold_docs_get_revs(Db, DocId, true) -> + fabric2_fdb:get_all_revs_future(Db, DocId); + +fold_docs_get_revs(Db, DocId, false) -> + fabric2_fdb:get_winning_revs_future(Db, DocId, 1). + + +fold_docs_get_revs_wait(_Db, <<?LOCAL_DOC_PREFIX, _/binary>>, RevsFuture) -> + Rev = fabric2_fdb:get_local_doc_rev_wait(RevsFuture), + [Rev]; + +fold_docs_get_revs_wait(Db, _DocId, RevsFuture) -> + fabric2_fdb:get_revs_wait(Db, RevsFuture). + + +fold_docs_get_doc_body_future(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, + [Rev]) -> + fabric2_fdb:get_local_doc_body_future(Db, DocId, Rev); + +fold_docs_get_doc_body_future(Db, DocId, Revs) -> + Winner = get_rev_winner(Revs), + fabric2_fdb:get_doc_body_future(Db, DocId, Winner). + + +fold_docs_get_doc_body_wait(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, [Rev], + _DocOpts, BodyFuture) -> + case fabric2_fdb:get_local_doc_body_wait(Db, DocId, Rev, BodyFuture) of + {not_found, missing} -> {not_found, missing}; + Doc -> {ok, Doc} + end; + +fold_docs_get_doc_body_wait(Db, DocId, Revs, DocOpts, BodyFuture) -> + RevInfo = get_rev_winner(Revs), + Base = fabric2_fdb:get_doc_body_wait(Db, DocId, RevInfo, + BodyFuture), + apply_open_doc_opts(Base, Revs, DocOpts). + + +drain_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C < 100 -> + Acc; +drain_fold_docs_revs_futures(TxDb, Acc) -> + drain_one_fold_docs_revs_future(TxDb, Acc). + + +drain_all_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C =< 0 -> + Acc; +drain_all_fold_docs_revs_futures(TxDb, #{revs_count := C} = Acc) when C > 0 -> + NewAcc = drain_one_fold_docs_revs_future(TxDb, Acc), + drain_all_fold_docs_revs_futures(TxDb, NewAcc). + + +drain_one_fold_docs_revs_future(TxDb, Acc) -> + #{ + revs_q := RevsQ, + revs_count := RevsCount, + body_q := BodyQ, + body_count := BodyCount + } = Acc, + {{value, {DocId, RevsFuture}}, RestRevsQ} = queue:out(RevsQ), + + Revs = fold_docs_get_revs_wait(TxDb, DocId, RevsFuture), + DocFuture = case Revs of + [] -> + {DocId, [], not_found}; + [_ | _] -> + BodyFuture = fold_docs_get_doc_body_future(TxDb, DocId, Revs), + {DocId, Revs, BodyFuture} + end, + NewAcc = Acc#{ + revs_q := RestRevsQ, + revs_count := RevsCount - 1, + body_q := queue:in(DocFuture, BodyQ), + body_count := BodyCount + 1 + }, + drain_fold_docs_body_futures(TxDb, NewAcc). + + +drain_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C < 100 -> + Acc; +drain_fold_docs_body_futures(TxDb, Acc) -> + drain_one_fold_docs_body_future(TxDb, Acc). + + +drain_all_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C =< 0 -> + Acc; +drain_all_fold_docs_body_futures(TxDb, #{body_count := C} = Acc) when C > 0 -> + NewAcc = drain_one_fold_docs_body_future(TxDb, Acc), + drain_all_fold_docs_body_futures(TxDb, NewAcc). + + +drain_one_fold_docs_body_future(TxDb, Acc) -> + #{ + body_q := BodyQ, + body_count := BodyCount, + doc_opts := DocOpts, + user_fun := UserFun, + user_acc := UserAcc + } = Acc, + {{value, {DocId, Revs, BodyFuture}}, RestBodyQ} = queue:out(BodyQ), + Doc = case BodyFuture of + not_found -> + {not_found, missing}; + _ -> + fold_docs_get_doc_body_wait(TxDb, DocId, Revs, DocOpts, BodyFuture) + end, + NewUserAcc = maybe_stop(UserFun(DocId, Doc, UserAcc)), + Acc#{ + body_q := RestBodyQ, + body_count := BodyCount - 1, + user_acc := NewUserAcc + }. + + +get_rev_winner(Revs) -> + [Winner] = lists:filter(fun(Rev) -> + maps:get(winner, Rev) + end, Revs), + Winner. + + +new_revid(Db, Doc) -> + #doc{ + id = DocId, + body = Body, + revs = {OldStart, OldRevs}, + atts = Atts, + deleted = Deleted + } = Doc, + + {NewAtts, AttSigInfo} = lists:mapfoldl(fun(Att, Acc) -> + [Name, Type, Data, Md5] = couch_att:fetch([name, type, data, md5], Att), + case Data of + {loc, _, _, _} -> + {Att, [{Name, Type, Md5} | Acc]}; + _ -> + Att1 = couch_att:flush(Db, DocId, Att), + Att2 = couch_att:store(revpos, OldStart + 1, Att1), + {Att2, [{Name, Type, couch_att:fetch(md5, Att2)} | Acc]} + end + end, [], Atts), + + Rev = case length(Atts) == length(AttSigInfo) of + true -> + OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end, + SigTerm = [Deleted, OldStart, OldRev, Body, AttSigInfo], + couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}])); + false -> + erlang:error(missing_att_info) + end, + + Doc#doc{ + revs = {OldStart + 1, [Rev | OldRevs]}, + atts = NewAtts + }. + + +get_all_docs_meta(TxDb, Options) -> + NS = couch_util:get_value(namespace, Options), + DocCount = get_doc_count(TxDb, NS), + case lists:keyfind(update_seq, 1, Options) of + {_, true} -> + UpdateSeq = fabric2_db:get_update_seq(TxDb), + [{update_seq, UpdateSeq}]; + _ -> + [] + end ++ [{total, DocCount}, {offset, null}]. + + +maybe_set_interactive(#{} = Db, Options) -> + Interactive = fabric2_util:get_value(interactive, Options, false), + Db#{interactive := Interactive}. + + +maybe_set_user_ctx(Db, Options) -> + case fabric2_util:get_value(user_ctx, Options) of + #user_ctx{} = UserCtx -> + set_user_ctx(Db, UserCtx); + undefined -> + Db + end. + + +is_member(Db, {SecProps}) when is_list(SecProps) -> + case is_admin(Db, {SecProps}) of + true -> + true; + false -> + case is_public_db(SecProps) of + true -> + true; + false -> + {Members} = get_members(SecProps), + UserCtx = get_user_ctx(Db), + is_authorized(Members, UserCtx) + end + end. + + +is_authorized(Group, UserCtx) -> + #user_ctx{ + name = UserName, + roles = UserRoles + } = UserCtx, + Names = fabric2_util:get_value(<<"names">>, Group, []), + Roles = fabric2_util:get_value(<<"roles">>, Group, []), + case check_security(roles, UserRoles, [<<"_admin">> | Roles]) of + true -> + true; + false -> + check_security(names, UserName, Names) + end. + + +check_security(roles, [], _) -> + false; +check_security(roles, UserRoles, Roles) -> + UserRolesSet = ordsets:from_list(UserRoles), + RolesSet = ordsets:from_list(Roles), + not ordsets:is_disjoint(UserRolesSet, RolesSet); +check_security(names, _, []) -> + false; +check_security(names, null, _) -> + false; +check_security(names, UserName, Names) -> + lists:member(UserName, Names). + + +throw_security_error(#user_ctx{name = null} = UserCtx) -> + Reason = <<"You are not authorized to access this db.">>, + throw_security_error(UserCtx, Reason); +throw_security_error(#user_ctx{name = _} = UserCtx) -> + Reason = <<"You are not allowed to access this db.">>, + throw_security_error(UserCtx, Reason). + + +throw_security_error(#user_ctx{} = UserCtx, Reason) -> + Error = security_error_type(UserCtx), + throw({Error, Reason}). + + +security_error_type(#user_ctx{name = null}) -> + unauthorized; +security_error_type(#user_ctx{name = _}) -> + forbidden. + + +is_public_db(SecProps) -> + {Members} = get_members(SecProps), + Names = fabric2_util:get_value(<<"names">>, Members, []), + Roles = fabric2_util:get_value(<<"roles">>, Members, []), + Names =:= [] andalso Roles =:= []. + + +get_admins(SecProps) -> + fabric2_util:get_value(<<"admins">>, SecProps, {[]}). + + +get_members(SecProps) -> + % we fallback to readers here for backwards compatibility + case fabric2_util:get_value(<<"members">>, SecProps) of + undefined -> + fabric2_util:get_value(<<"readers">>, SecProps, {[]}); + Members -> + Members + end. + + +apply_open_doc_opts(Doc0, Revs, Options) -> + IncludeRevsInfo = lists:member(revs_info, Options), + IncludeConflicts = lists:member(conflicts, Options), + IncludeDelConflicts = lists:member(deleted_conflicts, Options), + IncludeLocalSeq = lists:member(local_seq, Options), + + % This revs_info becomes fairly useless now that we're + % not keeping old document bodies around... + Meta1 = if not IncludeRevsInfo -> []; true -> + {Pos, [Rev | RevPath]} = Doc0#doc.revs, + RevPathMissing = lists:map(fun(R) -> {R, missing} end, RevPath), + [{revs_info, Pos, [{Rev, available} | RevPathMissing]}] + end, + + Meta2 = if not IncludeConflicts -> []; true -> + Conflicts = [RI || RI = #{winner := false, deleted := false} <- Revs], + if Conflicts == [] -> []; true -> + ConflictRevs = [maps:get(rev_id, RI) || RI <- Conflicts], + [{conflicts, ConflictRevs}] + end + end, + + Meta3 = if not IncludeDelConflicts -> []; true -> + DelConflicts = [RI || RI = #{winner := false, deleted := true} <- Revs], + if DelConflicts == [] -> []; true -> + DelConflictRevs = [maps:get(rev_id, RI) || RI <- DelConflicts], + [{deleted_conflicts, DelConflictRevs}] + end + end, + + Meta4 = if not IncludeLocalSeq -> []; true -> + #{winner := true, sequence := SeqVS} = lists:last(Revs), + [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}] + end, + + Doc1 = case lists:keyfind(atts_since, 1, Options) of + {_, PossibleAncestors} -> + #doc{ + revs = DocRevs, + atts = Atts0 + } = Doc0, + RevPos = find_ancestor_rev_pos(DocRevs, PossibleAncestors), + Atts1 = lists:map(fun(Att) -> + [AttPos, Data] = couch_att:fetch([revpos, data], Att), + if AttPos > RevPos -> couch_att:store(data, Data, Att); + true -> couch_att:store(data, stub, Att) + end + end, Atts0), + Doc0#doc{atts = Atts1}; + false -> + Doc0 + end, + + {ok, Doc1#doc{meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4}}. + + +find_ancestor_rev_pos({_, []}, _PossibleAncestors) -> + 0; +find_ancestor_rev_pos(_DocRevs, []) -> + 0; +find_ancestor_rev_pos({RevPos, [RevId | Rest]}, AttsSinceRevs) -> + case lists:member({RevPos, RevId}, AttsSinceRevs) of + true -> RevPos; + false -> find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs) + end. + + +filter_found_revs(RevInfo, Revs) -> + #{ + rev_id := {Pos, Rev}, + rev_path := RevPath + } = RevInfo, + FullRevPath = [Rev | RevPath], + lists:flatmap(fun({FindPos, FindRev} = RevIdToFind) -> + if FindPos > Pos -> [RevIdToFind]; true -> + % Add 1 because lists:nth is 1 based + Idx = Pos - FindPos + 1, + case Idx > length(FullRevPath) of + true -> + [RevIdToFind]; + false -> + case lists:nth(Idx, FullRevPath) == FindRev of + true -> []; + false -> [RevIdToFind] + end + end + end + end, Revs). + + +find_possible_ancestors(RevInfos, MissingRevs) -> + % Find any revinfos that are possible ancestors + % of the missing revs. A possible ancestor is + % any rev that has a start position less than + % any missing revision. Stated alternatively, + % find any revinfo that could theoretically + % extended to be one or more of the missing + % revisions. + % + % Since we are looking at any missing revision + % we can just compare against the maximum missing + % start position. + MaxMissingPos = case MissingRevs of + [] -> 0; + [_ | _] -> lists:max([Start || {Start, _Rev} <- MissingRevs]) + end, + lists:flatmap(fun(RevInfo) -> + #{rev_id := {RevPos, _} = RevId} = RevInfo, + case RevPos < MaxMissingPos of + true -> [RevId]; + false -> [] + end + end, RevInfos). + + +apply_before_doc_update(Db, Docs, Options) -> + UpdateType = case lists:member(replicated_changes, Options) of + true -> replicated_changes; + false -> interactive_edit + end, + lists:map(fun(Doc) -> + fabric2_db_plugin:before_doc_update(Db, Doc, UpdateType) + end, Docs). + + +update_doc_int(#{} = Db, #doc{} = Doc, Options) -> + IsLocal = case Doc#doc.id of + <<?LOCAL_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + try + case {IsLocal, is_replicated(Options)} of + {false, false} -> update_doc_interactive(Db, Doc, Options); + {false, true} -> update_doc_replicated(Db, Doc, Options); + {true, _} -> update_local_doc(Db, Doc, Options) + end + catch throw:{?MODULE, Return} -> + Return + end. + + +batch_update_docs(Db, Docs, Options) -> + BAcc = #bacc{ + db = Db, + docs = Docs, + batch_size = get_batch_size(Options), + options = Options, + rev_futures = #{}, + seen = [], + results = [] + }, + #bacc{results = Res} = batch_update_docs(BAcc), + lists:reverse(Res). + + +batch_update_docs(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_docs(#bacc{db = Db} = BAcc) -> + #bacc{ + db = Db, + docs = Docs, + options = Options + } = BAcc, + + BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) -> + BAccTx = BAcc#bacc{db = TxDb}, + case is_replicated(Options) of + false -> + Tagged = tag_docs(Docs), + RevFutures = get_winning_rev_futures(TxDb, Tagged), + BAccTx1 = BAccTx#bacc{ + docs = Tagged, + rev_futures = RevFutures + }, + batch_update_interactive_tx(BAccTx1); + true -> + BAccTx1 = batch_update_replicated_tx(BAccTx), + % For replicated updates reset `seen` after every transaction + BAccTx1#bacc{seen = []} + end + end), + + % Clean up after the transaction ends so we can recurse with a clean state + maps:map(fun(Tag, RangeFuture) when is_reference(Tag) -> + ok = erlfdb:cancel(RangeFuture, [flush]) + end, BAccTx2#bacc.rev_futures), + + BAcc1 = BAccTx2#bacc{ + db = Db, + rev_futures = #{} + }, + + batch_update_docs(BAcc1). + + +batch_update_interactive_tx(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_interactive_tx(#bacc{} = BAcc) -> + #bacc{ + db = TxDb, + docs = [Doc | Docs], + options = Options, + batch_size = MaxSize, + rev_futures = RevFutures, + seen = Seen, + results = Results + } = BAcc, + {Res, Seen1} = try + update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen) + catch throw:{?MODULE, Return} -> + {Return, Seen} + end, + BAcc1 = BAcc#bacc{ + docs = Docs, + results = [Res | Results], + seen = Seen1 + }, + case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of + true -> BAcc1; + false -> batch_update_interactive_tx(BAcc1) + end. + + +batch_update_replicated_tx(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_replicated_tx(#bacc{} = BAcc) -> + #bacc{ + db = TxDb, + docs = [Doc | Docs], + options = Options, + batch_size = MaxSize, + seen = Seen, + results = Results + } = BAcc, + case lists:member(Doc#doc.id, Seen) of + true -> + % If we already updated this doc in the current transaction, wait + % till the next transaction to update it again. + BAcc; + false -> + Res = update_doc_int(TxDb, Doc, Options), + BAcc1 = BAcc#bacc{ + docs = Docs, + results = [Res | Results], + seen = [Doc#doc.id | Seen] + }, + case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of + true -> BAcc1; + false -> batch_update_replicated_tx(BAcc1) + end + end. + + +update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc, + Options, _Futures, SeenIds) -> + {update_local_doc(Db, Doc, Options), SeenIds}; + +update_docs_interactive(Db, Doc, Options, Futures, SeenIds) -> + case lists:member(Doc#doc.id, SeenIds) of + true -> + {conflict, SeenIds}; + false -> + Future = maps:get(doc_tag(Doc), Futures), + case update_doc_interactive(Db, Doc, Future, Options) of + {ok, _} = Resp -> + {Resp, [Doc#doc.id | SeenIds]}; + _ = Resp -> + {Resp, SeenIds} + end + end. + + +update_doc_interactive(Db, Doc0, Options) -> + % Get the current winning revision. This is needed + % regardless of which branch we're updating. The extra + % revision we're grabbing is an optimization to + % save us a round trip if we end up deleting + % the winning revision branch. + NumRevs = if Doc0#doc.deleted -> 2; true -> 1 end, + Future = fabric2_fdb:get_winning_revs_future(Db, Doc0#doc.id, NumRevs), + update_doc_interactive(Db, Doc0, Future, Options). + + +update_doc_interactive(Db, Doc0, Future, _Options) -> + RevInfos = fabric2_fdb:get_revs_wait(Db, Future), + {Winner, SecondPlace} = case RevInfos of + [] -> {not_found, not_found}; + [WRI] -> {WRI, not_found}; + [WRI, SPRI] -> {WRI, SPRI} + end, + WinnerRevId = case Winner of + not_found -> + {0, <<>>}; + _ -> + case maps:get(deleted, Winner) of + true -> {0, <<>>}; + false -> maps:get(rev_id, Winner) + end + end, + + % Check that a revision was specified if required + Doc0RevId = doc_to_revid(Doc0), + HasRev = Doc0RevId =/= {0, <<>>}, + if HasRev orelse WinnerRevId == {0, <<>>} -> ok; true -> + ?RETURN({Doc0, conflict}) + end, + + % Allow inserting new deleted documents. Only works when the document has + % never existed to match CouchDB 3.x + case not HasRev andalso Doc0#doc.deleted andalso is_map(Winner) of + true -> ?RETURN({Doc0, conflict}); + false -> ok + end, + + % Get the target revision to update + Target = case Doc0RevId == WinnerRevId of + true -> + Winner; + false -> + case fabric2_fdb:get_non_deleted_rev(Db, Doc0#doc.id, Doc0RevId) of + #{deleted := false} = Target0 -> + Target0; + not_found -> + % Either a missing revision or a deleted + % revision. Either way a conflict. Note + % that we get not_found for a deleted revision + % because we only check for the non-deleted + % key in fdb + ?RETURN({Doc0, conflict}) + end + end, + + Doc1 = case Winner of + #{deleted := true} when not Doc0#doc.deleted -> + % When recreating a deleted document we want to extend + % the winning revision branch rather than create a + % new branch. If we did not do this we could be + % recreating into a state that previously existed. + Doc0#doc{revs = fabric2_util:revinfo_to_revs(Winner)}; + #{} -> + % Otherwise we're extending the target's revision + % history with this update + Doc0#doc{revs = fabric2_util:revinfo_to_revs(Target)}; + not_found -> + % Creating a new doc means our revs start empty + Doc0 + end, + + % Validate the doc update and create the + % new revinfo map + Doc2 = prep_and_validate(Db, Doc1, Target), + + Doc3 = new_revid(Db, Doc2), + + #doc{ + deleted = NewDeleted, + revs = {NewRevPos, [NewRev | NewRevPath]}, + atts = Atts + } = Doc4 = stem_revisions(Db, Doc3), + + NewRevInfo = #{ + winner => undefined, + exists => false, + deleted => NewDeleted, + rev_id => {NewRevPos, NewRev}, + rev_path => NewRevPath, + sequence => undefined, + branch_count => undefined, + att_hash => fabric2_util:hash_atts(Atts), + rev_size => fabric2_util:rev_size(Doc4) + }, + + % Gather the list of possible winnig revisions + Possible = case Target == Winner of + true when not Doc4#doc.deleted -> + [NewRevInfo]; + true when Doc4#doc.deleted -> + case SecondPlace of + #{} -> [NewRevInfo, SecondPlace]; + not_found -> [NewRevInfo] + end; + false -> + [NewRevInfo, Winner] + end, + + % Sort the rev infos such that the winner is first + {NewWinner0, NonWinner} = case fabric2_util:sort_revinfos(Possible) of + [W] -> {W, not_found}; + [W, NW] -> {W, NW} + end, + + BranchCount = case Winner of + not_found -> 1; + #{branch_count := BC} -> BC + end, + NewWinner = NewWinner0#{branch_count := BranchCount}, + ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end, + ToRemove = if Target == not_found -> []; true -> [Target] end, + + ok = fabric2_fdb:write_doc( + Db, + Doc4, + NewWinner, + Winner, + ToUpdate, + ToRemove + ), + + {ok, {NewRevPos, NewRev}}. + + +update_doc_replicated(Db, Doc0, _Options) -> + #doc{ + id = DocId, + deleted = Deleted, + revs = {RevPos, [Rev | RevPath]} + } = Doc0, + + DocRevInfo0 = #{ + winner => undefined, + exists => false, + deleted => Deleted, + rev_id => {RevPos, Rev}, + rev_path => RevPath, + sequence => undefined, + branch_count => undefined, + att_hash => <<>>, + rev_size => null + }, + + AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId), + + RevTree = lists:foldl(fun(RI, TreeAcc) -> + RIPath = fabric2_util:revinfo_to_path(RI), + {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath), + Merged + end, [], AllRevInfos), + + DocRevPath = fabric2_util:revinfo_to_path(DocRevInfo0), + + {NewTree, Status} = couch_key_tree:merge(RevTree, DocRevPath), + if Status /= internal_node -> ok; true -> + % We already know this revision so nothing + % left to do. + ?RETURN({Doc0, {ok, []}}) + end, + + % Its possible to have a replication with fewer than $revs_limit + % revisions which extends an existing branch. To avoid + % losing revision history we extract the new node from the + % tree and use the combined path after stemming. + {[{_, {RevPos, UnstemmedRevs}}], []} + = couch_key_tree:get(NewTree, [{RevPos, Rev}]), + + Doc1 = stem_revisions(Db, Doc0#doc{revs = {RevPos, UnstemmedRevs}}), + + {RevPos, [Rev | NewRevPath]} = Doc1#doc.revs, + DocRevInfo1 = DocRevInfo0#{rev_path := NewRevPath}, + + % Find any previous revision we knew about for + % validation and attachment handling. + AllLeafsFull = couch_key_tree:get_all_leafs_full(NewTree), + LeafPath = get_leaf_path(RevPos, Rev, AllLeafsFull), + PrevRevInfo = find_prev_revinfo(RevPos, LeafPath), + Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo), + Doc3 = flush_doc_atts(Db, Doc2), + DocRevInfo2 = DocRevInfo1#{ + atts_hash => fabric2_util:hash_atts(Doc3#doc.atts), + rev_size => fabric2_util:rev_size(Doc3) + }, + + % Possible winners are the previous winner and + % the new DocRevInfo + Winner = case fabric2_util:sort_revinfos(AllRevInfos) of + [#{winner := true} = WRI | _] -> WRI; + [] -> not_found + end, + {NewWinner0, NonWinner} = case Winner == PrevRevInfo of + true -> + {DocRevInfo2, not_found}; + false -> + [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo2]), + {W, NW} + end, + + NewWinner = NewWinner0#{branch_count := length(AllLeafsFull)}, + ToUpdate = if NonWinner == not_found -> []; true -> [NonWinner] end, + ToRemove = if PrevRevInfo == not_found -> []; true -> [PrevRevInfo] end, + + ok = fabric2_fdb:write_doc( + Db, + Doc3, + NewWinner, + Winner, + ToUpdate, + ToRemove + ), + + {ok, []}. + + +update_local_doc(Db, Doc0, _Options) -> + Doc1 = case increment_local_doc_rev(Doc0) of + {ok, Updated} -> Updated; + {error, Error} -> ?RETURN({Doc0, Error}) + end, + + ok = fabric2_fdb:write_local_doc(Db, Doc1), + + #doc{revs = {0, [Rev]}} = Doc1, + {ok, {0, integer_to_binary(Rev)}}. + + +flush_doc_atts(Db, Doc) -> + #doc{ + id = DocId, + atts = Atts + } = Doc, + NewAtts = lists:map(fun(Att) -> + case couch_att:fetch(data, Att) of + {loc, _, _, _} -> + Att; + _ -> + couch_att:flush(Db, DocId, Att) + end + end, Atts), + Doc#doc{atts = NewAtts}. + + +get_winning_rev_futures(Db, Docs) -> + lists:foldl(fun(Doc, Acc) -> + #doc{ + id = DocId, + deleted = Deleted + } = Doc, + IsLocal = case DocId of + <<?LOCAL_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + if IsLocal -> Acc; true -> + NumRevs = if Deleted -> 2; true -> 1 end, + Future = fabric2_fdb:get_winning_revs_future(Db, DocId, NumRevs), + DocTag = doc_tag(Doc), + Acc#{DocTag => Future} + end + end, #{}, Docs). + + +prep_and_validate(Db, NewDoc, PrevRevInfo) -> + HasStubs = couch_doc:has_stubs(NewDoc), + HasVDUs = [] /= maps:get(validate_doc_update_funs, Db), + IsDDoc = case NewDoc#doc.id of + <<?DESIGN_DOC_PREFIX, _/binary>> -> true; + _ -> false + end, + + WasDeleted = case PrevRevInfo of + not_found -> false; + #{deleted := D} -> D + end, + + PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of + true when PrevRevInfo /= not_found, not WasDeleted -> + case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of + #doc{} = PDoc -> PDoc; + {not_found, _} -> nil + end; + _ -> + nil + end, + + MergedDoc = if not HasStubs -> NewDoc; true -> + % This will throw an error if we have any + % attachment stubs missing data + couch_doc:merge_stubs(NewDoc, PrevDoc) + end, + check_duplicate_attachments(MergedDoc), + validate_doc_update(Db, MergedDoc, PrevDoc), + MergedDoc. + + +validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) -> + case catch check_is_admin(Db) of + ok -> validate_ddoc(Db, Doc); + Error -> ?RETURN({Doc, Error}) + end; +validate_doc_update(Db, Doc, PrevDoc) -> + #{ + security_doc := Security, + validate_doc_update_funs := VDUs + } = Db, + Fun = fun() -> + JsonCtx = fabric2_util:user_ctx_to_json(Db), + lists:map(fun(VDU) -> + try + case VDU(Doc, PrevDoc, JsonCtx, Security) of + ok -> ok; + Error1 -> throw(Error1) + end + catch throw:Error2 -> + ?RETURN({Doc, Error2}) + end + end, VDUs) + end, + Stat = [couchdb, query_server, vdu_process_time], + if VDUs == [] -> ok; true -> + couch_stats:update_histogram(Stat, Fun) + end. + + +validate_ddoc(Db, DDoc) -> + try + ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc)) + catch + throw:{invalid_design_doc, Reason} -> + throw({bad_request, invalid_design_doc, Reason}); + throw:{compilation_error, Reason} -> + throw({bad_request, compilation_error, Reason}); + throw:Error -> + ?RETURN({DDoc, Error}) + end. + + +validate_atomic_update(_, false) -> + ok; +validate_atomic_update(AllDocs, true) -> + % TODO actually perform the validation. This requires some hackery, we need + % to basically extract the prep_and_validate_updates function from couch_db + % and only run that, without actually writing in case of a success. + Error = {not_implemented, <<"all_or_nothing is not supported">>}, + PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> + case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, + {{Id, {Pos, RevId}}, Error} + end, AllDocs), + throw({aborted, PreCommitFailures}). + + +check_duplicate_attachments(#doc{atts = Atts}) -> + lists:foldl(fun(Att, Names) -> + Name = couch_att:fetch(name, Att), + case ordsets:is_element(Name, Names) of + true -> throw({bad_request, <<"Duplicate attachments">>}); + false -> ordsets:add_element(Name, Names) + end + end, ordsets:new(), Atts). + + +get_since_seq(Db, rev, <<>>) -> + get_since_seq(Db, rev, now); + +get_since_seq(_Db, _Dir, Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0-> + fabric2_util:seq_zero_vs(); + +get_since_seq(Db, Dir, Seq) when Seq == now; Seq == <<"now">> -> + CurrSeq = fabric2_fdb:get_last_change(Db), + get_since_seq(Db, Dir, CurrSeq); + +get_since_seq(_Db, _Dir, Seq) when is_binary(Seq), size(Seq) == 24 -> + fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(Seq)); + +get_since_seq(Db, Dir, List) when is_list(List) -> + get_since_seq(Db, Dir, list_to_binary(List)); + +get_since_seq(_Db, _Dir, Seq) -> + erlang:error({invalid_since_seq, Seq}). + + +get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) -> + LeafPath; +get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) -> + get_leaf_path(Pos, Rev, RestLeafs). + + +find_prev_revinfo(_Pos, []) -> + not_found; +find_prev_revinfo(Pos, [{_Rev, ?REV_MISSING} | RestPath]) -> + find_prev_revinfo(Pos - 1, RestPath); +find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) -> + RevInfo. + + +increment_local_doc_rev(#doc{deleted = true} = Doc) -> + {ok, Doc#doc{revs = {0, [0]}}}; +increment_local_doc_rev(#doc{revs = {0, []}} = Doc) -> + {ok, Doc#doc{revs = {0, [1]}}}; +increment_local_doc_rev(#doc{revs = {0, [RevStr | _]}} = Doc) -> + try + PrevRev = binary_to_integer(RevStr), + {ok, Doc#doc{revs = {0, [PrevRev + 1]}}} + catch error:badarg -> + {error, <<"Invalid rev format">>} + end; +increment_local_doc_rev(#doc{}) -> + {error, <<"Invalid rev format">>}. + + +doc_to_revid(#doc{revs = Revs}) -> + case Revs of + {0, []} -> {0, <<>>}; + {RevPos, [Rev | _]} -> {RevPos, Rev} + end. + + +tag_docs([]) -> + []; +tag_docs([#doc{meta = Meta} = Doc | Rest]) -> + Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}), + NewDoc = Doc#doc{meta = Meta1}, + [NewDoc | tag_docs(Rest)]. + + +doc_tag(#doc{meta = Meta}) -> + fabric2_util:get_value(ref, Meta). + + +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + + +docid(DocId) when is_list(DocId) -> + list_to_binary(DocId); +docid(DocId) -> + DocId. + + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + + +maybe_stop({ok, Acc}) -> + Acc; +maybe_stop({stop, Acc}) -> + throw({stop, Acc}). + + +set_design_doc_keys(Options1) -> + Dir = couch_util:get_value(dir, Options1, fwd), + Options2 = set_design_doc_start_key(Options1, Dir), + set_design_doc_end_key(Options2, Dir). + + +set_design_doc_start_key(Options, fwd) -> + Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY), + Key2 = max(Key1, ?FIRST_DDOC_KEY), + lists:keystore(start_key, 1, Options, {start_key, Key2}); + +set_design_doc_start_key(Options, rev) -> + Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY), + Key2 = min(Key1, ?LAST_DDOC_KEY), + lists:keystore(start_key, 1, Options, {start_key, Key2}). + + +set_design_doc_end_key(Options, fwd) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = min(Key1, ?LAST_DDOC_KEY), + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = min(EKeyGT, ?LAST_DDOC_KEY), + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end; + +set_design_doc_end_key(Options, rev) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?FIRST_DDOC_KEY), + Key2 = max(Key1, ?FIRST_DDOC_KEY), + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = max(EKeyGT, ?FIRST_DDOC_KEY), + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end. + + +stem_revisions(#{} = Db, #doc{} = Doc) -> + #{revs_limit := RevsLimit} = Db, + #doc{revs = {RevPos, Revs}} = Doc, + case RevPos >= RevsLimit of + true -> Doc#doc{revs = {RevPos, lists:sublist(Revs, RevsLimit)}}; + false -> Doc + end. + + +open_json_doc(Db, DocId, OpenOpts, DocOpts) -> + case fabric2_db:open_doc(Db, DocId, OpenOpts) of + {not_found, missing} -> + []; + {ok, #doc{deleted = true}} -> + [{doc, null}]; + {ok, #doc{} = Doc} -> + [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] + end. + + +get_cached_db(#{} = Db, Opts) when is_list(Opts) -> + MaxAge = fabric2_util:get_value(max_age, Opts, 0), + Now = erlang:monotonic_time(millisecond), + Age = Now - maps:get(check_current_ts, Db), + case Age < MaxAge of + true -> + Db; + false -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:ensure_current(TxDb) + end) + end. + + +is_replicated(Options) when is_list(Options) -> + lists:member(replicated_changes, Options). + + +get_batch_size(Options) -> + case fabric2_util:get_value(batch_size, Options) of + undefined -> + config:get_integer("fabric", "update_docs_batch_size", + ?DEFAULT_UPDATE_DOCS_BATCH_SIZE); + Val when is_integer(Val) -> + Val + end. |