path: root/src/fabric/src/fabric2_fdb.erl
diff options
Diffstat (limited to 'src/fabric/src/fabric2_fdb.erl')
1 files changed, 2085 insertions, 0 deletions
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
new file mode 100644
index 000000000..36fa451ab
--- /dev/null
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -0,0 +1,2085 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+ transactional/1,
+ transactional/3,
+ transactional/2,
+ create/2,
+ open/2,
+ ensure_current/1,
+ delete/1,
+ undelete/3,
+ remove_deleted_db/2,
+ exists/1,
+ get_dir/1,
+ list_dbs/4,
+ list_dbs_info/4,
+ list_deleted_dbs_info/4,
+ get_info/1,
+ get_info_future/2,
+ get_info_wait/1,
+ set_config/3,
+ get_stat/2,
+ incr_stat/3,
+ incr_stat/4,
+ get_all_revs/2,
+ get_all_revs_future/2,
+ get_winning_revs/3,
+ get_winning_revs_future/3,
+ get_revs_wait/2,
+ get_non_deleted_rev/3,
+ get_doc_body/3,
+ get_doc_body_future/3,
+ get_doc_body_wait/4,
+ get_local_doc_rev_future/2,
+ get_local_doc_rev_wait/1,
+ get_local_doc_body_future/3,
+ get_local_doc_body_wait/4,
+ get_local_doc/2,
+ get_local_doc_rev/3,
+ write_doc/6,
+ write_local_doc/2,
+ read_attachment/3,
+ write_attachment/4,
+ get_last_change/1,
+ fold_range/5,
+ vs_to_seq/1,
+ seq_to_vs/1,
+ next_vs/1,
+ new_versionstamp/1,
+ get_approximate_tx_size/1,
+ chunkify_binary/1,
+ chunkify_binary/2,
+ debug_cluster/0,
+ debug_cluster/2
+-record(fold_acc, {
+ db,
+ restart_tx,
+ start_key,
+ end_key,
+ limit,
+ skip,
+ retries,
+ base_opts,
+ user_fun,
+ user_acc
+-record(info_future, {
+ tx,
+ db_prefix,
+ changes_future,
+ meta_future,
+ uuid_future,
+ retries = 0
+transactional(Fun) ->
+ do_transaction(Fun, undefined).
+transactional(DbName, Options, Fun) when is_binary(DbName) ->
+ with_span(Fun, #{'' => DbName}, fun() ->
+ transactional(fun(Tx) ->
+ Fun(init_db(Tx, DbName, Options))
+ end)
+ end).
+transactional(#{tx := undefined} = Db, Fun) ->
+ DbName = maps:get(name, Db, undefined),
+ try
+ Db1 = refresh(Db),
+ Reopen = maps:get(reopen, Db1, false),
+ Db2 = maps:remove(reopen, Db1),
+ LayerPrefix = case Reopen of
+ true -> undefined;
+ false -> maps:get(layer_prefix, Db2)
+ end,
+ with_span(Fun, #{'' => DbName}, fun() ->
+ do_transaction(fun(Tx) ->
+ case Reopen of
+ true -> Fun(reopen(Db2#{tx => Tx}));
+ false -> Fun(Db2#{tx => Tx})
+ end
+ end, LayerPrefix)
+ end)
+ catch throw:{?MODULE, reopen} ->
+ with_span('db.reopen', #{'' => DbName}, fun() ->
+ transactional(Db#{reopen => true}, Fun)
+ end)
+ end;
+transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
+ DbName = maps:get(name, Db, undefined),
+ with_span(Fun, #{'' => DbName}, fun() ->
+ Fun(Db)
+ end).
+do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
+ Db = get_db_handle(),
+ try
+ erlfdb:transactional(Db, fun(Tx) ->
+ case get(erlfdb_trace) of
+ Name when is_binary(Name) ->
+ UId = erlang:unique_integer([positive]),
+ UIdBin = integer_to_binary(UId, 36),
+ TxId = <<Name/binary, "_", UIdBin/binary>>,
+ erlfdb:set_option(Tx, transaction_logging_enable, TxId);
+ _ ->
+ ok
+ end,
+ case is_transaction_applied(Tx) of
+ true ->
+ get_previous_transaction_result();
+ false ->
+ execute_transaction(Tx, Fun, LayerPrefix)
+ end
+ end)
+ after
+ clear_transaction()
+ end.
+create(#{} = Db0, Options) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db1 = ensure_current(Db0, false),
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ HCA = erlfdb_hca:create(erlfdb_tuple:pack({?DB_HCA}, LayerPrefix)),
+ AllocPrefix = erlfdb_hca:allocate(HCA, Tx),
+ DbPrefix = erlfdb_tuple:pack({?DBS, AllocPrefix}, LayerPrefix),
+ erlfdb:set(Tx, DbKey, DbPrefix),
+ % This key is responsible for telling us when something in
+ % the database cache (i.e., fabric2_server's ets table) has
+ % changed and requires re-loading. This currently includes
+ % revs_limit and validate_doc_update functions. There's
+ % no order to versioning here. Its just a value that changes
+ % that is used in the ensure_current check.
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ DbVersion = fabric2_util:uuid(),
+ erlfdb:set(Tx, DbVersionKey, DbVersion),
+ UUID = fabric2_util:uuid(),
+ Defaults = [
+ {?DB_CONFIG, <<"uuid">>, UUID},
+ {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)},
+ {?DB_CONFIG, <<"security_doc">>, <<"{}">>},
+ {?DB_STATS, <<"doc_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"sizes">>, <<"external">>, ?uint2bin(2)},
+ {?DB_STATS, <<"sizes">>, <<"views">>, ?uint2bin(0)}
+ ],
+ lists:foreach(fun
+ ({P, K, V}) ->
+ Key = erlfdb_tuple:pack({P, K}, DbPrefix),
+ erlfdb:set(Tx, Key, V);
+ ({P, S, K, V}) ->
+ Key = erlfdb_tuple:pack({P, S, K}, DbPrefix),
+ erlfdb:set(Tx, Key, V)
+ end, Defaults),
+ UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
+ Options1 = lists:keydelete(user_ctx, 1, Options),
+ Db2 = Db1#{
+ uuid => UUID,
+ db_prefix => DbPrefix,
+ db_version => DbVersion,
+ revs_limit => 1000,
+ security_doc => {[]},
+ user_ctx => UserCtx,
+ check_current_ts => erlang:monotonic_time(millisecond),
+ validate_doc_update_funs => [],
+ before_doc_update => undefined,
+ after_doc_read => undefined,
+ % All other db things as we add features,
+ db_options => Options1,
+ interactive => false
+ },
+ aegis:init_db(Db2, Options).
+open(#{} = Db0, Options) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db1 = ensure_current(Db0, false),
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) -> Bin;
+ not_found -> erlang:error(database_does_not_exist)
+ end,
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ DbVersion = erlfdb:wait(erlfdb:get(Tx, DbVersionKey)),
+ UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
+ Options1 = lists:keydelete(user_ctx, 1, Options),
+ UUID = fabric2_util:get_value(uuid, Options1),
+ Options2 = lists:keydelete(uuid, 1, Options1),
+ Interactive = fabric2_util:get_value(interactive, Options2, false),
+ Options3 = lists:keydelete(interactive, 1, Options2),
+ Db2 = Db1#{
+ db_prefix => DbPrefix,
+ db_version => DbVersion,
+ uuid => <<>>,
+ revs_limit => 1000,
+ security_doc => {[]},
+ user_ctx => UserCtx,
+ check_current_ts => erlang:monotonic_time(millisecond),
+ % Place holders until we implement these
+ % bits.
+ validate_doc_update_funs => [],
+ before_doc_update => undefined,
+ after_doc_read => undefined,
+ db_options => Options3,
+ interactive => Interactive
+ },
+ Db3 = load_config(Db2),
+ Db4 = aegis:open_db(Db3),
+ case {UUID, Db4} of
+ {undefined, _} -> ok;
+ {<<_/binary>>, #{uuid := UUID}} -> ok;
+ {<<_/binary>>, #{uuid := _}} -> erlang:error(database_does_not_exist)
+ end,
+ load_validate_doc_funs(Db4).
+% Match on `name` in the function head since some non-fabric2 db
+% objects might not have names and so they don't get cached
+refresh(#{tx := undefined, name := DbName} = Db) ->
+ #{
+ uuid := UUID,
+ md_version := OldVer
+ } = Db,
+ case fabric2_server:fetch(DbName, UUID) of
+ % Relying on these assumptions about the `md_version` value:
+ % - It is bumped every time `db_version` is bumped
+ % - Is a versionstamp, so we can check which one is newer
+ % - If it is `not_found`, it would sort less than a binary value
+ #{md_version := Ver} = Db1 when Ver > OldVer ->
+ Db1#{
+ user_ctx := maps:get(user_ctx, Db),
+ security_fun := maps:get(security_fun, Db),
+ interactive := maps:get(interactive, Db)
+ };
+ _ ->
+ Db
+ end;
+refresh(#{} = Db) ->
+ Db.
+reopen(#{} = OldDb) ->
+ require_transaction(OldDb),
+ #{
+ tx := Tx,
+ name := DbName,
+ uuid := UUID,
+ db_options := Options,
+ user_ctx := UserCtx,
+ security_fun := SecurityFun,
+ interactive := Interactive
+ } = OldDb,
+ Options1 = lists:keystore(user_ctx, 1, Options, {user_ctx, UserCtx}),
+ NewDb = open(init_db(Tx, DbName, Options1), Options1),
+ % Check if database was re-created
+ case {Interactive, maps:get(uuid, NewDb)} of
+ {true, _} -> ok;
+ {false, UUID} -> ok;
+ {false, _OtherUUID} -> error(database_does_not_exist)
+ end,
+ NewDb#{security_fun := SecurityFun, interactive := Interactive}.
+delete(#{} = Db) ->
+ DoRecovery = fabric2_util:do_recovery(),
+ case DoRecovery of
+ true -> soft_delete_db(Db);
+ false -> hard_delete_db(Db)
+ end.
+undelete(#{} = Db0, TgtDbName, TimeStamp) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = ensure_current(Db0, false),
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, TgtDbName}, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) ->
+ file_exists;
+ not_found ->
+ DeletedDbTupleKey = {
+ DbName,
+ TimeStamp
+ },
+ DeleteDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DeleteDbKey)) of
+ not_found ->
+ not_found;
+ DbPrefix ->
+ erlfdb:set(Tx, DbKey, DbPrefix),
+ erlfdb:clear(Tx, DeleteDbKey),
+ bump_db_version(#{
+ tx => Tx,
+ db_prefix => DbPrefix
+ }),
+ ok
+ end
+ end.
+remove_deleted_db(#{} = Db0, TimeStamp) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = ensure_current(Db0, false),
+ DeletedDbTupleKey = {
+ DbName,
+ TimeStamp
+ },
+ DeletedDbKey = erlfdb_tuple:pack(DeletedDbTupleKey, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of
+ not_found ->
+ not_found;
+ DbPrefix ->
+ erlfdb:clear(Tx, DeletedDbKey),
+ erlfdb:clear_range_startswith(Tx, DbPrefix),
+ bump_db_version(#{
+ tx => Tx,
+ db_prefix => DbPrefix
+ }),
+ ok
+ end.
+exists(#{name := DbName} = Db) when is_binary(DbName) ->
+ #{
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = ensure_current(Db, false),
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) -> true;
+ not_found -> false
+ end.
+get_dir(Tx) ->
+ Root = erlfdb_directory:root(),
+ Dir = fabric2_server:fdb_directory(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, Dir),
+ erlfdb_directory:get_name(CouchDB).
+list_dbs(Tx, Callback, AccIn, Options0) ->
+ Options = case fabric2_util:get_value(restart_tx, Options0) of
+ undefined -> [{restart_tx, true} | Options0];
+ _AlreadySet -> Options0
+ end,
+ LayerPrefix = get_dir(Tx),
+ Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+ fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
+ {DbName} = erlfdb_tuple:unpack(K, Prefix),
+ Callback(DbName, Acc)
+ end, AccIn, Options).
+list_dbs_info(Tx, Callback, AccIn, Options0) ->
+ Options = case fabric2_util:get_value(restart_tx, Options0) of
+ undefined -> [{restart_tx, true} | Options0];
+ _AlreadySet -> Options0
+ end,
+ LayerPrefix = get_dir(Tx),
+ Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+ fold_range({tx, Tx}, Prefix, fun({DbNameKey, DbPrefix}, Acc) ->
+ {DbName} = erlfdb_tuple:unpack(DbNameKey, Prefix),
+ InfoFuture = get_info_future(Tx, DbPrefix),
+ Callback(DbName, InfoFuture, Acc)
+ end, AccIn, Options).
+list_deleted_dbs_info(Tx, Callback, AccIn, Options0) ->
+ Options = case fabric2_util:get_value(restart_tx, Options0) of
+ undefined -> [{restart_tx, true} | Options0];
+ _AlreadySet -> Options0
+ end,
+ LayerPrefix = get_dir(Tx),
+ Prefix = erlfdb_tuple:pack({?DELETED_DBS}, LayerPrefix),
+ fold_range({tx, Tx}, Prefix, fun({DbKey, DbPrefix}, Acc) ->
+ {DbName, TimeStamp} = erlfdb_tuple:unpack(DbKey, Prefix),
+ InfoFuture = get_info_future(Tx, DbPrefix),
+ Callback(DbName, TimeStamp, InfoFuture, Acc)
+ end, AccIn, Options).
+get_info(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ get_info_wait(get_info_future(Tx, DbPrefix)).
+get_info_future(Tx, DbPrefix) ->
+ {CStart, CEnd} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
+ ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
+ {streaming_mode, exact},
+ {limit, 1},
+ {reverse, true}
+ ]),
+ UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+ UUIDFuture = erlfdb:get(Tx, UUIDKey),
+ StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
+ MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),
+ % Save the tx object only if it's read-only as we might retry to get the
+ % future again after the tx was reset
+ SaveTx = case erlfdb:get_writes_allowed(Tx) of
+ true -> undefined;
+ false -> Tx
+ end,
+ #info_future{
+ tx = SaveTx,
+ db_prefix = DbPrefix,
+ changes_future = ChangesFuture,
+ meta_future = MetaFuture,
+ uuid_future = UUIDFuture
+ }.
+get_info_wait(#info_future{tx = Tx, retries = Retries} = Future)
+ when Tx =:= undefined orelse Retries >= 2 ->
+ get_info_wait_int(Future);
+get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) ->
+ try
+ get_info_wait_int(Future)
+ catch
+ error:{erlfdb_error, ?TRANSACTION_CANCELLED} ->
+ Future1 = get_info_future(Tx, Future#info_future.db_prefix),
+ get_info_wait(Future1#info_future{retries = Retries + 1});
+ error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+ ok = erlfdb:reset(Tx),
+ Future1 = get_info_future(Tx, Future#info_future.db_prefix),
+ get_info_wait(Future1#info_future{retries = Retries + 1})
+ end.
+load_config(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+ {Start, End} = erlfdb_tuple:range({?DB_CONFIG}, DbPrefix),
+ Future = erlfdb:get_range(Tx, Start, End),
+ lists:foldl(fun({K, V}, DbAcc) ->
+ {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix),
+ case Key of
+ <<"uuid">> -> DbAcc#{uuid := V};
+ <<"revs_limit">> -> DbAcc#{revs_limit := ?bin2uint(V)};
+ <<"security_doc">> -> DbAcc#{security_doc := ?JSON_DECODE(V)}
+ end
+ end, Db, erlfdb:wait(Future)).
+set_config(#{} = Db0, Key, Val) when is_atom(Key) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db = ensure_current(Db0),
+ {BinKey, BinVal} = case Key of
+ uuid -> {<<"uuid">>, Val};
+ revs_limit -> {<<"revs_limit">>, ?uint2bin(max(1, Val))};
+ security_doc -> {<<"security_doc">>, ?JSON_ENCODE(Val)}
+ end,
+ DbKey = erlfdb_tuple:pack({?DB_CONFIG, BinKey}, DbPrefix),
+ erlfdb:set(Tx, DbKey, BinVal),
+ {ok, DbVersion} = bump_db_version(Db),
+ {ok, Db#{db_version := DbVersion, Key := Val}}.
+get_stat(#{} = Db, StatKey) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
+ % Might need to figure out some sort of type
+ % system here. Uints are because stats are all
+ % atomic op adds for the moment.
+ ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))).
+incr_stat(_Db, _StatKey, 0) ->
+ ok;
+incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
+ erlfdb:add(Tx, Key, Increment).
+incr_stat(_Db, _Section, _Key, 0) ->
+ ok;
+incr_stat(#{} = Db, Section, Key, Increment) when is_integer(Increment) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ BinKey = erlfdb_tuple:pack({?DB_STATS, Section, Key}, DbPrefix),
+ erlfdb:add(Tx, BinKey, Increment).
+get_all_revs(#{} = Db, DocId) ->
+ DbName = maps:get(name, Db, undefined),
+ with_span('db.get_all_revs', #{'' => DbName, '' => DocId}, fun() ->
+ Future = get_all_revs_future(Db, DocId),
+ get_revs_wait(Db, Future)
+ end).
+get_all_revs_future(#{} = Db, DocId) ->
+ Options = [{streaming_mode, want_all}],
+ get_revs_future(Db, DocId, Options).
+get_winning_revs(Db, DocId, NumRevs) ->
+ DbName = maps:get(name, Db, undefined),
+ with_span('db.get_winning_revs', #{'' => DbName, '' => DocId}, fun() ->
+ Future = get_winning_revs_future(Db, DocId, NumRevs),
+ get_revs_wait(Db, Future)
+ end).
+get_winning_revs_future(#{} = Db, DocId, NumRevs) ->
+ Options = [{reverse, true}, {limit, NumRevs}],
+ get_revs_future(Db, DocId, Options).
+get_revs_future(#{} = Db, DocId, Options) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ {StartKey, EndKey} = erlfdb_tuple:range({?DB_REVS, DocId}, DbPrefix),
+ erlfdb:fold_range_future(Tx, StartKey, EndKey, Options).
+get_revs_wait(#{} = Db, RangeFuture) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ RevRows = erlfdb:fold_range_wait(Tx, RangeFuture, fun({K, V}, Acc) ->
+ Key = erlfdb_tuple:unpack(K, DbPrefix),
+ Val = erlfdb_tuple:unpack(V),
+ [fdb_to_revinfo(Key, Val) | Acc]
+ end, []),
+ lists:reverse(RevRows).
+get_non_deleted_rev(#{} = Db, DocId, RevId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ {RevPos, Rev} = RevId,
+ BaseKey = {?DB_REVS, DocId, true, RevPos, Rev},
+ Key = erlfdb_tuple:pack(BaseKey, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ fdb_to_revinfo(BaseKey, erlfdb_tuple:unpack(Val))
+ end.
+get_doc_body(Db, DocId, RevInfo) ->
+ DbName = maps:get(name, Db, undefined),
+ with_span('db.get_doc_body', #{'' => DbName, '' => DocId}, fun() ->
+ Future = get_doc_body_future(Db, DocId, RevInfo),
+ get_doc_body_wait(Db, DocId, RevInfo, Future)
+ end).
+get_doc_body_future(#{} = Db, DocId, RevInfo) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ #{
+ rev_id := {RevPos, Rev}
+ } = RevInfo,
+ Key = {?DB_DOCS, DocId, RevPos, Rev},
+ {StartKey, EndKey} = erlfdb_tuple:range(Key, DbPrefix),
+ erlfdb:fold_range_future(Tx, StartKey, EndKey, []).
+get_doc_body_wait(#{} = Db0, DocId, RevInfo, Future) ->
+ #{
+ tx := Tx
+ } = Db = ensure_current(Db0),
+ #{
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath
+ } = RevInfo,
+ FoldFun = aegis:wrap_fold_fun(Db, fun({_K, V}, Acc) ->
+ [V | Acc]
+ end),
+ RevBodyRows = erlfdb:fold_range_wait(Tx, Future, FoldFun, []),
+ BodyRows = lists:reverse(RevBodyRows),
+ fdb_to_doc(Db, DocId, RevPos, [Rev | RevPath], BodyRows).
+get_local_doc_rev_future(Db, DocId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, DocId}, DbPrefix),
+ erlfdb:get(Tx, Key).
+get_local_doc_rev_wait(Future) ->
+ erlfdb:wait(Future).
+get_local_doc_body_future(#{} = Db, DocId, _Rev) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ Prefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, DocId}, DbPrefix),
+ erlfdb:get_range_startswith(Tx, Prefix).
+get_local_doc_body_wait(#{} = Db0, DocId, Rev, Future) ->
+ Db = ensure_current(Db0),
+ {_, Chunks} = lists:unzip(aegis:decrypt(Db, erlfdb:wait(Future))),
+ fdb_to_local_doc(Db, DocId, Rev, Chunks).
+get_local_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId) ->
+ RevFuture = get_local_doc_rev_future(Db, DocId),
+ Rev = get_local_doc_rev_wait(RevFuture),
+ BodyFuture = get_local_doc_body_future(Db, DocId, Rev),
+ get_local_doc_body_wait(Db, DocId, Rev, BodyFuture).
+get_local_doc_rev(_Db0, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, Val) ->
+ case Val of
+ <<255, RevBin/binary>> ->
+ % Versioned local docs
+ try
+ case erlfdb_tuple:unpack(RevBin) of
+ {?CURR_LDOC_FORMAT, Rev, _Size} -> Rev
+ end
+ catch _:_ ->
+ erlang:error({invalid_local_doc_rev, DocId, Val})
+ end;
+ <<131, _/binary>> ->
+ % Compatibility clause for an older encoding format
+ try binary_to_term(Val, [safe]) of
+ {Rev, _} -> Rev;
+ _ -> erlang:error({invalid_local_doc_rev, DocId, Val})
+ catch
+ error:badarg ->
+ erlang:error({invalid_local_doc_rev, DocId, Val})
+ end;
+ <<_/binary>> ->
+ try binary_to_integer(Val) of
+ IntVal when IntVal >= 0 ->
+ Val;
+ _ ->
+ erlang:error({invalid_local_doc_rev, DocId, Val})
+ catch
+ error:badarg ->
+ erlang:error({invalid_local_doc_rev, DocId, Val})
+ end
+ end.
+write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db = ensure_current(Db0),
+ #doc{
+ id = DocId,
+ deleted = Deleted,
+ atts = Atts
+ } = Doc,
+ % Doc body
+ ok = write_doc_body(Db, Doc),
+ % Attachment bookkeeping
+ % If a document's attachments have changed we have to scan
+ % for any attachments that may need to be deleted. The check
+ % for `>= 2` is a bit subtle. The important point is that
+ % one of the revisions will be from the new document so we
+ % have to find at least one more beyond that to assert that
+ % the attachments have not changed.
+ AttHash = fabric2_util:hash_atts(Atts),
+ RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove,
+ AttHashCount = lists:foldl(fun(Att, Count) ->
+ #{att_hash := RevAttHash} = Att,
+ case RevAttHash == AttHash of
+ true -> Count + 1;
+ false -> Count
+ end
+ end, 0, RevsToCheck),
+ if
+ AttHashCount == length(RevsToCheck) ->
+ ok;
+ AttHashCount >= 2 ->
+ ok;
+ true ->
+ cleanup_attachments(Db, DocId, Doc, ToRemove)
+ end,
+ % Revision tree
+ NewWinner = NewWinner0#{
+ winner := true
+ },
+ NewRevId = maps:get(rev_id, NewWinner),
+ {WKey, WVal, WinnerVS} = revinfo_to_fdb(Tx, DbPrefix, DocId, NewWinner),
+ ok = erlfdb:set_versionstamped_value(Tx, WKey, WVal),
+ lists:foreach(fun(RI0) ->
+ RI = RI0#{winner := false},
+ {K, V, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
+ ok = erlfdb:set(Tx, K, V)
+ end, ToUpdate),
+ lists:foreach(fun(RI0) ->
+ RI = RI0#{winner := false},
+ {K, _, undefined} = revinfo_to_fdb(Tx, DbPrefix, DocId, RI),
+ ok = erlfdb:clear(Tx, K),
+ ok = clear_doc_body(Db, DocId, RI0)
+ end, ToRemove),
+ % _all_docs
+ UpdateStatus = case {OldWinner, NewWinner} of
+ {not_found, #{deleted := false}} ->
+ created;
+ {not_found, #{deleted := true}} ->
+ replicate_deleted;
+ {#{deleted := true}, #{deleted := false}} ->
+ recreated;
+ {#{deleted := false}, #{deleted := false}} ->
+ updated;
+ {#{deleted := false}, #{deleted := true}} ->
+ deleted;
+ {#{deleted := true}, #{deleted := true}} ->
+ ignore
+ end,
+ case UpdateStatus of
+ replicate_deleted ->
+ ok;
+ ignore ->
+ ok;
+ deleted ->
+ ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+ ok = erlfdb:clear(Tx, ADKey);
+ _ ->
+ ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+ ADVal = erlfdb_tuple:pack(NewRevId),
+ ok = erlfdb:set(Tx, ADKey, ADVal)
+ end,
+ % _changes
+ if OldWinner == not_found -> ok; true ->
+ OldSeq = maps:get(sequence, OldWinner),
+ OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldSeq}, DbPrefix),
+ erlfdb:clear(Tx, OldSeqKey)
+ end,
+ NewSeqKey = erlfdb_tuple:pack_vs({?DB_CHANGES, WinnerVS}, DbPrefix),
+ NewSeqVal = erlfdb_tuple:pack({DocId, Deleted, NewRevId}),
+ erlfdb:set_versionstamped_key(Tx, NewSeqKey, NewSeqVal),
+ % Bump db version on design doc changes
+ IsDDoc = case of
+ <<?DESIGN_DOC_PREFIX, _/binary>> -> true;
+ _ -> false
+ end,
+ if not IsDDoc -> ok; true ->
+ bump_db_version(Db)
+ end,
+ % Update our document counts
+ case UpdateStatus of
+ created ->
+ if not IsDDoc -> ok; true ->
+ incr_stat(Db, <<"doc_design_count">>, 1)
+ end,
+ incr_stat(Db, <<"doc_count">>, 1);
+ recreated ->
+ if not IsDDoc -> ok; true ->
+ incr_stat(Db, <<"doc_design_count">>, 1)
+ end,
+ incr_stat(Db, <<"doc_count">>, 1),
+ incr_stat(Db, <<"doc_del_count">>, -1);
+ replicate_deleted ->
+ incr_stat(Db, <<"doc_del_count">>, 1);
+ ignore ->
+ ok;
+ deleted ->
+ if not IsDDoc -> ok; true ->
+ incr_stat(Db, <<"doc_design_count">>, -1)
+ end,
+ incr_stat(Db, <<"doc_count">>, -1),
+ incr_stat(Db, <<"doc_del_count">>, 1);
+ updated ->
+ ok
+ end,
+ fabric2_db_plugin:after_doc_write(Db, Doc, NewWinner, OldWinner,
+ NewRevId, WinnerVS),
+ % Update database size
+ AddSize = sum_add_rev_sizes([NewWinner | ToUpdate]),
+ RemSize = sum_rem_rev_sizes(ToRemove),
+ incr_stat(Db, <<"sizes">>, <<"external">>, AddSize - RemSize),
+ ok.
+write_local_doc(#{} = Db0, Doc) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db = ensure_current(Db0),
+ Id =,
+ {LDocKey, LDocVal, NewSize, Rows} = local_doc_to_fdb(Db, Doc),
+ {WasDeleted, PrevSize} = case erlfdb:wait(erlfdb:get(Tx, LDocKey)) of
+ <<255, RevBin/binary>> ->
+ case erlfdb_tuple:unpack(RevBin) of
+ {?CURR_LDOC_FORMAT, _Rev, Size} ->
+ {false, Size}
+ end;
+ <<_/binary>> ->
+ {false, 0};
+ not_found ->
+ {true, 0}
+ end,
+ BPrefix = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id}, DbPrefix),
+ case Doc#doc.deleted of
+ true ->
+ erlfdb:clear(Tx, LDocKey),
+ erlfdb:clear_range_startswith(Tx, BPrefix);
+ false ->
+ erlfdb:set(Tx, LDocKey, LDocVal),
+ % Make sure to clear the whole range, in case there was a larger
+ % document body there before.
+ erlfdb:clear_range_startswith(Tx, BPrefix),
+ lists:foreach(fun({K, V}) ->
+ erlfdb:set(Tx, K, aegis:encrypt(Db, K, V))
+ end, Rows)
+ end,
+ case {WasDeleted, Doc#doc.deleted} of
+ {true, false} ->
+ incr_stat(Db, <<"doc_local_count">>, 1);
+ {false, true} ->
+ incr_stat(Db, <<"doc_local_count">>, -1);
+ _ ->
+ ok
+ end,
+ incr_stat(Db, <<"sizes">>, <<"external">>, NewSize - PrevSize),
+ ok.
+read_attachment(#{} = Db, DocId, AttId) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+ Data = case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
+ not_found ->
+ throw({not_found, missing});
+ KVs ->
+ {_, Chunks} = lists:unzip(aegis:decrypt(Db, KVs)),
+ iolist_to_binary(Chunks)
+ end,
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, IdKey)) of
+ <<>> ->
+ Data; % Old format, before CURR_ATT_STORAGE_VER = 0
+ <<_/binary>> = InfoBin ->
+ {?CURR_ATT_STORAGE_VER, Compressed} = erlfdb_tuple:unpack(InfoBin),
+ case Compressed of
+ true -> binary_to_term(Data, [safe]);
+ false -> Data
+ end
+ end.
+write_attachment(#{} = Db, DocId, Data, Encoding)
+ when is_binary(Data), is_atom(Encoding) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ AttId = fabric2_util:uuid(),
+ {Data1, Compressed} = case Encoding of
+ gzip ->
+ {Data, false};
+ _ ->
+ Opts = [{minor_version, 1}, {compressed, 6}],
+ CompressedData = term_to_binary(Data, Opts),
+ case size(CompressedData) < Data of
+ true -> {CompressedData, true};
+ false -> {Data, false}
+ end
+ end,
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ InfoVal = erlfdb_tuple:pack({?CURR_ATT_STORAGE_VER, Compressed}),
+ ok = erlfdb:set(Tx, IdKey, InfoVal),
+ Chunks = chunkify_binary(Data1),
+ lists:foldl(fun(Chunk, ChunkId) ->
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
+ ok = erlfdb:set(Tx, AttKey, aegis:encrypt(Db, AttKey, Chunk)),
+ ChunkId + 1
+ end, 0, Chunks),
+ {ok, AttId}.
+get_last_change(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ {Start, End} = erlfdb_tuple:range({?DB_CHANGES}, DbPrefix),
+ Options = [{limit, 1}, {reverse, true}],
+ case erlfdb:get_range(Tx, Start, End, Options) of
+ [] ->
+ vs_to_seq(fabric2_util:seq_zero_vs());
+ [{K, _V}] ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
+ vs_to_seq(SeqVS)
+ end.
+fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) ->
+ {Db, Tx} = case TxOrDb of
+ {tx, TxObj} ->
+ {undefined, TxObj};
+ #{} = DbObj ->
+ DbObj1 = #{tx := TxObj} = ensure_current(DbObj),
+ {DbObj1, TxObj}
+ end,
+ % FoundationDB treats a limit 0 of as unlimited so we guard against it
+ case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+ FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options),
+ try
+ fold_range(Tx, FAcc)
+ after
+ end
+ end.
+fold_range(Tx, FAcc) ->
+ #fold_acc{
+ start_key = Start,
+ end_key = End,
+ limit = Limit,
+ base_opts = BaseOpts,
+ restart_tx = DoRestart
+ } = FAcc,
+ case DoRestart of false -> ok; true ->
+ ok = erlfdb:set_option(Tx, disallow_writes)
+ end,
+ Opts = [{limit, Limit} | BaseOpts],
+ Callback = fun fold_range_cb/2,
+ try
+ #fold_acc{
+ user_acc = FinalUserAcc
+ } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+ FinalUserAcc
+ catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart ->
+ % Possibly handle cluster_version_changed and future_version as well to
+ % continue iteration instead fallback to transactional and retrying
+ % from the beginning which is bound to fail when streaming data out to a
+ % socket.
+ fold_range(Tx, restart_fold(Tx, FAcc))
+ end.
+vs_to_seq(VS) when is_tuple(VS) ->
+ % 51 is the versionstamp type tag
+ <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
+ fabric2_util:to_hex(SeqBin).
+seq_to_vs(Seq) when is_binary(Seq) ->
+ Seq1 = fabric2_util:from_hex(Seq),
+ % 51 is the versionstamp type tag
+ Seq2 = <<51:8, Seq1/binary>>,
+ {VS} = erlfdb_tuple:unpack(Seq2),
+ VS.
+next_vs({versionstamp, VS, Batch, TxId}) ->
+ {V, B, T} = case TxId =< 65535 of
+ true ->
+ {VS, Batch, TxId + 1};
+ false ->
+ case Batch =< 65535 of
+ true ->
+ {VS, Batch + 1, 0};
+ false ->
+ {VS + 1, 0, 0}
+ end
+ end,
+ {versionstamp, V, B, T}.
+new_versionstamp(Tx) ->
+ TxId = erlfdb:get_next_tx_id(Tx),
+ {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+get_approximate_tx_size(#{} = TxDb) ->
+ require_transaction(TxDb),
+ #{tx := Tx} = TxDb,
+ erlfdb:wait(erlfdb:get_approximate_size(Tx)).
+chunkify_binary(Data) ->
+ chunkify_binary(Data, binary_chunk_size()).
+chunkify_binary(Data, Size) ->
+ case Data of
+ <<>> ->
+ [];
+ <<Head:Size/binary, Rest/binary>> ->
+ [Head | chunkify_binary(Rest, Size)];
+ <<_/binary>> when size(Data) < Size ->
+ [Data]
+ end.
+debug_cluster() ->
+ debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
+debug_cluster(Start, End) ->
+ transactional(fun(Tx) ->
+ lists:foreach(fun({Key, Val}) ->
+ io:format(standard_error, "~s => ~s~n", [
+ string:pad(erlfdb_util:repr(Key), 60),
+ erlfdb_util:repr(Val)
+ ])
+ end, erlfdb:get_range(Tx, Start, End))
+ end).
+init_db(Tx, DbName, Options) ->
+ Prefix = get_dir(Tx),
+ Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+ #{
+ name => DbName,
+ tx => Tx,
+ layer_prefix => Prefix,
+ md_version => Version,
+ security_fun => undefined,
+ db_options => Options
+ }.
+load_validate_doc_funs(#{} = Db) ->
+ FoldFun = fun
+ ({row, Row}, Acc) ->
+ DDocInfo = #{id => fabric2_util:get_value(id, Row)},
+ {ok, [DDocInfo | Acc]};
+ (_, Acc) ->
+ {ok, Acc}
+ end,
+ Options = [
+ {start_key, <<"_design/">>},
+ {end_key, <<"_design0">>}
+ ],
+ {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
+ Infos2 = lists:map(fun(Info) ->
+ #{
+ id := DDocId = <<"_design/", _/binary>>
+ } = Info,
+ Info#{
+ rev_info => get_winning_revs_future(Db, DDocId, 1)
+ }
+ end, Infos1),
+ Infos3 = lists:flatmap(fun(Info) ->
+ #{
+ id := DDocId,
+ rev_info := RevInfoFuture
+ } = Info,
+ [RevInfo] = get_revs_wait(Db, RevInfoFuture),
+ #{deleted := Deleted} = RevInfo,
+ if Deleted -> []; true ->
+ [Info#{
+ rev_info := RevInfo,
+ body => get_doc_body_future(Db, DDocId, RevInfo)
+ }]
+ end
+ end, Infos2),
+ VDUs = lists:flatmap(fun(Info) ->
+ #{
+ id := DDocId,
+ rev_info := RevInfo,
+ body := BodyFuture
+ } = Info,
+ #doc{} = Doc = get_doc_body_wait(Db, DDocId, RevInfo, BodyFuture),
+ case couch_doc:get_validate_doc_fun(Doc) of
+ nil -> [];
+ Fun -> [Fun]
+ end
+ end, Infos3),
+ Db#{
+ validate_doc_update_funs := VDUs
+ }.
+bump_metadata_version(Tx) ->
+ % The 14 zero bytes is pulled from the PR for adding the
+ % metadata version key. Not sure why 14 bytes when version
+ % stamps are only 80, but whatever for now.
+ erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).
+check_metadata_version(#{} = Db) ->
+ #{
+ tx := Tx,
+ md_version := Version
+ } = Db,
+ AlreadyChecked = get(?PDICT_CHECKED_MD_IS_CURRENT),
+ if AlreadyChecked == true -> {current, Db}; true ->
+ case erlfdb:wait(erlfdb:get_ss(Tx, ?METADATA_VERSION_KEY)) of
+ Version ->
+ % We want to set a read conflict on the db version as we'd want
+ % to conflict with any writes to this particular db. However
+ % during db creation db prefix might not exist yet so we don't
+ % add a read-conflict on it then.
+ case maps:get(db_prefix, Db, not_found) of
+ not_found ->
+ ok;
+ <<_/binary>> = DbPrefix ->
+ DbVerKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ erlfdb:add_read_conflict_key(Tx, DbVerKey)
+ end,
+ {current, Db};
+ NewVersion ->
+ {stale, Db#{md_version := NewVersion}}
+ end
+ end.
+bump_db_version(#{} = Db) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ DbVersion = fabric2_util:uuid(),
+ ok = erlfdb:set(Tx, DbVersionKey, DbVersion),
+ ok = bump_metadata_version(Tx),
+ {ok, DbVersion}.
+check_db_version(#{} = Db, CheckDbVersion) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix,
+ db_version := DbVersion
+ } = Db,
+ AlreadyChecked = get(?PDICT_CHECKED_DB_IS_CURRENT),
+ if not CheckDbVersion orelse AlreadyChecked == true -> current; true ->
+ DbVersionKey = erlfdb_tuple:pack({?DB_VERSION}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DbVersionKey)) of
+ DbVersion ->
+ current;
+ _NewDBVersion ->
+ stale
+ end
+ end.
+soft_delete_db(Db) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ Timestamp = list_to_binary(fabric2_util:iso8601_timestamp()),
+ DeletedDbKeyTuple = {?DELETED_DBS, DbName, Timestamp},
+ DeletedDbKey = erlfdb_tuple:pack(DeletedDbKeyTuple, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DeletedDbKey)) of
+ not_found ->
+ erlfdb:set(Tx, DeletedDbKey, DbPrefix),
+ erlfdb:clear(Tx, DbKey),
+ bump_db_version(Db),
+ ok;
+ _Val ->
+ {deletion_frequency_exceeded, DbName}
+ end.
+hard_delete_db(Db) ->
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ erlfdb:clear(Tx, DbKey),
+ erlfdb:clear_range_startswith(Tx, DbPrefix),
+ bump_metadata_version(Tx),
+ ok.
+write_doc_body(#{} = Db0, #doc{} = Doc) ->
+ #{
+ tx := Tx
+ } = Db = ensure_current(Db0),
+ Rows = doc_to_fdb(Db, Doc),
+ lists:foreach(fun({Key, Value}) ->
+ ok = erlfdb:set(Tx, Key, aegis:encrypt(Db, Key, Value))
+ end, Rows).
+clear_doc_body(_Db, _DocId, not_found) ->
+ % No old body to clear
+ ok;
+clear_doc_body(#{} = Db, DocId, #{} = RevInfo) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+ #{
+ rev_id := {RevPos, Rev}
+ } = RevInfo,
+ BaseKey = {?DB_DOCS, DocId, RevPos, Rev},
+ {StartKey, EndKey} = erlfdb_tuple:range(BaseKey, DbPrefix),
+ ok = erlfdb:clear_range(Tx, StartKey, EndKey).
+cleanup_attachments(Db, DocId, NewDoc, ToRemove) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+ RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove),
+ % Gather all known document revisions
+ {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []),
+ AllDocs = [{ok, NewDoc} | DiskDocs],
+ % Get referenced attachment ids
+ ActiveIdSet = lists:foldl(fun({ok, Doc}, Acc) ->
+ #doc{
+ revs = {Pos, [Rev | _]}
+ } = Doc,
+ case lists:member({Pos, Rev}, RemoveRevs) of
+ true ->
+ Acc;
+ false ->
+ lists:foldl(fun(Att, InnerAcc) ->
+ {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att),
+ sets:add_element(AttId, InnerAcc)
+ end, Acc, Doc#doc.atts)
+ end
+ end, sets:new(), AllDocs),
+ AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix),
+ Options = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options),
+ ExistingIdSet = lists:foldl(fun({K, _}, Acc) ->
+ {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix),
+ sets:add_element(AttId, Acc)
+ end, sets:new(), erlfdb:wait(Future)),
+ AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet),
+ lists:foreach(fun(AttId) ->
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ erlfdb:clear(Tx, IdKey),
+ ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+ erlfdb:clear_range_startswith(Tx, ChunkKey)
+ end, sets:to_list(AttsToRemove)).
+revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
+ #{
+ deleted := Deleted,
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath,
+ branch_count := BranchCount,
+ att_hash := AttHash,
+ rev_size := RevSize
+ } = RevId,
+ VS = new_versionstamp(Tx),
+ Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
+ Val = {
+ VS,
+ BranchCount,
+ list_to_tuple(RevPath),
+ AttHash,
+ RevSize
+ },
+ KBin = erlfdb_tuple:pack(Key, DbPrefix),
+ VBin = erlfdb_tuple:pack_vs(Val),
+ {KBin, VBin, VS};
+revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) ->
+ #{
+ deleted := Deleted,
+ rev_id := {RevPos, Rev},
+ rev_path := RevPath,
+ att_hash := AttHash,
+ rev_size := RevSize
+ } = RevId,
+ Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
+ Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash, RevSize},
+ KBin = erlfdb_tuple:pack(Key, DbPrefix),
+ VBin = erlfdb_tuple:pack(Val),
+ {KBin, VBin, undefined}.
+fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _, _} = Val) ->
+ {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
+ {_RevFormat, Sequence, BranchCount, RevPath, AttHash, RevSize} = Val,
+ #{
+ winner => true,
+ exists => true,
+ deleted => not NotDeleted,
+ rev_id => {RevPos, Rev},
+ rev_path => tuple_to_list(RevPath),
+ sequence => Sequence,
+ branch_count => BranchCount,
+ att_hash => AttHash,
+ rev_size => RevSize
+ };
+fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) ->
+ {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
+ {_RevFormat, RevPath, AttHash, RevSize} = Val,
+ #{
+ winner => false,
+ exists => true,
+ deleted => not NotDeleted,
+ rev_id => {RevPos, Rev},
+ rev_path => tuple_to_list(RevPath),
+ sequence => undefined,
+ branch_count => undefined,
+ att_hash => AttHash,
+ rev_size => RevSize
+ };
+fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) ->
+ Val = {1, Seq, BCount, RPath, <<>>},
+ fdb_to_revinfo(Key, Val);
+fdb_to_revinfo(Key, {0, RPath}) ->
+ Val = {1, RPath, <<>>},
+ fdb_to_revinfo(Key, Val);
+fdb_to_revinfo(Key, {1, Seq, BCount, RPath, AttHash}) ->
+ % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments
+ Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, AttHash, 0},
+ fdb_to_revinfo(Key, Val);
+fdb_to_revinfo(Key, {1, RPath, AttHash}) ->
+ % Don't forget to change ?CURR_REV_FORMAT to 2 here when it increments
+ Val = {?CURR_REV_FORMAT, RPath, AttHash, 0},
+ fdb_to_revinfo(Key, Val).
+doc_to_fdb(Db, #doc{} = Doc) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ #doc{
+ id = Id,
+ revs = {Start, [Rev | _]},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+ DiskAtts = lists:map(fun couch_att:to_disk_term/1, Atts),
+ Opts = [{minor_version, 1}, {compressed, 6}],
+ Value = term_to_binary({Body, DiskAtts, Deleted}, Opts),
+ Chunks = chunkify_binary(Value),
+ {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) ->
+ Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev, ChunkId}, DbPrefix),
+ {{Key, Chunk}, ChunkId + 1}
+ end, 0, Chunks),
+ Rows.
+fdb_to_doc(_Db, _DocId, _Pos, _Path, []) ->
+ {not_found, missing};
+fdb_to_doc(Db, DocId, Pos, Path, BinRows) when is_list(BinRows) ->
+ Bin = iolist_to_binary(BinRows),
+ {Body, DiskAtts, Deleted} = binary_to_term(Bin, [safe]),
+ Atts = lists:map(fun(Att) ->
+ couch_att:from_disk_term(Db, DocId, Att)
+ end, DiskAtts),
+ Doc0 = #doc{
+ id = DocId,
+ revs = {Pos, Path},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ },
+ case Db of
+ #{after_doc_read := undefined} -> Doc0;
+ #{after_doc_read := ADR} -> ADR(Doc0, Db)
+ end.
+local_doc_to_fdb(Db, #doc{} = Doc) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ #doc{
+ id = Id,
+ revs = {0, [Rev]},
+ body = Body
+ } = Doc,
+ Key = erlfdb_tuple:pack({?DB_LOCAL_DOCS, Id}, DbPrefix),
+ StoreRev = case Rev of
+ _ when is_integer(Rev) -> integer_to_binary(Rev);
+ _ when is_binary(Rev) -> Rev
+ end,
+ BVal = term_to_binary(Body, [{minor_version, 1}, {compressed, 6}]),
+ {Rows, _} = lists:mapfoldl(fun(Chunk, ChunkId) ->
+ K = erlfdb_tuple:pack({?DB_LOCAL_DOC_BODIES, Id, ChunkId}, DbPrefix),
+ {{K, Chunk}, ChunkId + 1}
+ end, 0, chunkify_binary(BVal)),
+ NewSize = fabric2_util:ldoc_size(Doc),
+ RawValue = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, StoreRev, NewSize}),
+ % Prefix our tuple encoding to make upgrades easier
+ Value = <<255, RawValue/binary>>,
+ {Key, Value, NewSize, Rows}.
+fdb_to_local_doc(_Db, _DocId, not_found, []) ->
+ {not_found, missing};
+fdb_to_local_doc(_Db, DocId, <<131, _/binary>> = Val, []) ->
+ % This is an upgrade clause for the old encoding. We allow reading the old
+ % value and will perform an upgrade of the storage format on an update.
+ {Rev, Body} = binary_to_term(Val, [safe]),
+ #doc{
+ id = DocId,
+ revs = {0, [Rev]},
+ deleted = false,
+ body = Body
+ };
+fdb_to_local_doc(_Db, DocId, <<255, RevBin/binary>>, Rows) when is_list(Rows) ->
+ Rev = case erlfdb_tuple:unpack(RevBin) of
+ {?CURR_LDOC_FORMAT, Rev0, _Size} -> Rev0
+ end,
+ BodyBin = iolist_to_binary(Rows),
+ Body = binary_to_term(BodyBin, [safe]),
+ #doc{
+ id = DocId,
+ revs = {0, [Rev]},
+ deleted = false,
+ body = Body
+ };
+fdb_to_local_doc(Db, DocId, RawRev, Rows) ->
+ BaseRev = erlfdb_tuple:pack({?CURR_LDOC_FORMAT, RawRev, 0}),
+ Rev = <<255, BaseRev/binary>>,
+ fdb_to_local_doc(Db, DocId, Rev, Rows).
+sum_add_rev_sizes(RevInfos) ->
+ lists:foldl(fun(RI, Acc) ->
+ #{
+ exists := Exists,
+ rev_size := Size
+ } = RI,
+ case Exists of
+ true -> Acc;
+ false -> Size + Acc
+ end
+ end, 0, RevInfos).
+sum_rem_rev_sizes(RevInfos) ->
+ lists:foldl(fun(RI, Acc) ->
+ #{
+ exists := true,
+ rev_size := Size
+ } = RI,
+ Size + Acc
+ end, 0, RevInfos).
+get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options)
+ when is_map(Db) orelse Db =:= undefined ->
+ Reverse = case fabric2_util:get_value(dir, Options) of
+ rev -> true;
+ _ -> false
+ end,
+ StartKey0 = fabric2_util:get_value(start_key, Options),
+ EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
+ EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
+ InclusiveEnd = EndKeyGt == undefined,
+ WrapKeys = fabric2_util:get_value(wrap_keys, Options) /= false,
+ % CouchDB swaps the key meanings based on the direction
+ % of the fold. FoundationDB does not so we have to
+ % swap back here.
+ {StartKey1, EndKey1} = case Reverse of
+ false -> {StartKey0, EndKey0};
+ true -> {EndKey0, StartKey0}
+ end,
+ % Set the maximum bounds for the start and endkey
+ StartKey2 = case StartKey1 of
+ undefined ->
+ <<RangePrefix/binary, 16#00>>;
+ SK2 when not WrapKeys ->
+ erlfdb_tuple:pack(SK2, RangePrefix);
+ SK2 ->
+ erlfdb_tuple:pack({SK2}, RangePrefix)
+ end,
+ EndKey2 = case EndKey1 of
+ undefined ->
+ <<RangePrefix/binary, 16#FF>>;
+ EK2 when Reverse andalso not WrapKeys ->
+ PackedEK = erlfdb_tuple:pack(EK2, RangePrefix),
+ <<PackedEK/binary, 16#FF>>;
+ EK2 when Reverse ->
+ PackedEK = erlfdb_tuple:pack({EK2}, RangePrefix),
+ <<PackedEK/binary, 16#FF>>;
+ EK2 when not WrapKeys ->
+ erlfdb_tuple:pack(EK2, RangePrefix);
+ EK2 ->
+ erlfdb_tuple:pack({EK2}, RangePrefix)
+ end,
+ % FoundationDB ranges are applied as SK <= key < EK
+ % By default, CouchDB is SK <= key <= EK with the
+ % optional inclusive_end=false option changing that
+ % to SK <= key < EK. Also, remember that CouchDB
+ % swaps the meaning of SK and EK based on direction.
+ %
+ % Thus we have this wonderful bit of logic to account
+ % for all of those combinations.
+ StartKey3 = case {Reverse, InclusiveEnd} of
+ {true, false} ->
+ erlfdb_key:first_greater_than(StartKey2);
+ _ ->
+ StartKey2
+ end,
+ EndKey3 = case {Reverse, InclusiveEnd} of
+ {false, true} when EndKey0 /= undefined ->
+ erlfdb_key:first_greater_than(EndKey2);
+ {true, _} ->
+ erlfdb_key:first_greater_than(EndKey2);
+ _ ->
+ EndKey2
+ end,
+ Skip = case fabric2_util:get_value(skip, Options) of
+ S when is_integer(S), S >= 0 -> S;
+ _ -> 0
+ end,
+ Limit = case fabric2_util:get_value(limit, Options) of
+ L when is_integer(L), L >= 0 -> L + Skip;
+ undefined -> 0
+ end,
+ TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
+ T when is_integer(T), T >= 0 -> [{target_bytes, T}];
+ undefined -> []
+ end,
+ StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
+ undefined -> [];
+ Name when is_atom(Name) -> [{streaming_mode, Name}]
+ end,
+ Snapshot = case fabric2_util:get_value(snapshot, Options) of
+ undefined -> [];
+ B when is_boolean(B) -> [{snapshot, B}]
+ end,
+ BaseOpts = [{reverse, Reverse}]
+ ++ TargetBytes
+ ++ StreamingMode
+ ++ Snapshot,
+ RestartTx = fabric2_util:get_value(restart_tx, Options, false),
+ #fold_acc{
+ db = Db,
+ start_key = StartKey3,
+ end_key = EndKey3,
+ skip = Skip,
+ limit = Limit,
+ retries = 0,
+ base_opts = BaseOpts,
+ restart_tx = RestartTx,
+ user_fun = UserCallback,
+ user_acc = UserAcc
+ }.
+fold_range_cb({K, V}, #fold_acc{} = Acc) ->
+ #fold_acc{
+ skip = Skip,
+ limit = Limit,
+ user_fun = UserFun,
+ user_acc = UserAcc,
+ base_opts = Opts
+ } = Acc,
+ Acc1 = case Skip =:= 0 of
+ true ->
+ UserAcc1 = UserFun({K, V}, UserAcc),
+ Acc#fold_acc{limit = max(0, Limit - 1), user_acc = UserAcc1};
+ false ->
+ Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
+ end,
+ Acc2 = case fabric2_util:get_value(reverse, Opts, false) of
+ true -> Acc1#fold_acc{end_key = erlfdb_key:last_less_or_equal(K)};
+ false -> Acc1#fold_acc{start_key = erlfdb_key:first_greater_than(K)}
+ end,
+ Acc2.
+restart_fold(Tx, #fold_acc{} = Acc) ->
+ ok = erlfdb:reset(Tx),
+ case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of
+ {#fold_acc{db = Db} = Acc1, _} ->
+ Acc1#fold_acc{db = check_db_instance(Db), retries = 0};
+ {undefined, Retries} when Retries < ?MAX_FOLD_RANGE_RETRIES ->
+ Db = check_db_instance(Acc#fold_acc.db),
+ Acc#fold_acc{db = Db, retries = Retries + 1};
+ {undefined, _} ->
+ error(fold_range_not_progressing)
+ end.
+get_db_handle() ->
+ case get(?PDICT_DB_KEY) of
+ undefined ->
+ {ok, Db} = application:get_env(fabric, db),
+ put(?PDICT_DB_KEY, Db),
+ Db;
+ Db ->
+ Db
+ end.
+require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) ->
+ ok;
+require_transaction(#{} = _Db) ->
+ erlang:error(transaction_required).
+ensure_current(Db) ->
+ ensure_current(Db, true).
+ensure_current(#{} = Db0, CheckDbVersion) ->
+ require_transaction(Db0),
+ Db3 = case check_metadata_version(Db0) of
+ {current, Db1} ->
+ Db1;
+ {stale, Db1} ->
+ case check_db_version(Db1, CheckDbVersion) of
+ current ->
+ % If db version is current, update cache with the latest
+ % metadata so other requests can immediately see the
+ % refreshed db handle.
+ Now = erlang:monotonic_time(millisecond),
+ Db2 = Db1#{check_current_ts := Now},
+ fabric2_server:maybe_update(Db2),
+ Db2;
+ stale ->
+ fabric2_server:maybe_remove(Db1),
+ throw({?MODULE, reopen})
+ end
+ end,
+ case maps:get(security_fun, Db3) of
+ SecurityFun when is_function(SecurityFun, 2) ->
+ #{security_doc := SecDoc} = Db3,
+ ok = SecurityFun(Db3, SecDoc),
+ Db3#{security_fun := undefined};
+ undefined ->
+ Db3
+ end.
+check_db_instance(undefined) ->
+ undefined;
+check_db_instance(#{} = Db) ->
+ require_transaction(Db),
+ case check_metadata_version(Db) of
+ {current, Db1} ->
+ Db1;
+ {stale, Db1} ->
+ #{
+ tx := Tx,
+ uuid := UUID,
+ db_prefix := DbPrefix
+ } = Db1,
+ UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+ UUID -> Db1;
+ _ -> error(database_does_not_exist)
+ end
+ end.
+is_transaction_applied(Tx) ->
+ is_commit_unknown_result()
+ andalso has_transaction_id()
+ andalso transaction_id_exists(Tx).
+get_previous_transaction_result() ->
+execute_transaction(Tx, Fun, LayerPrefix) ->
+ Result = Fun(Tx),
+ case erlfdb:is_read_only(Tx) of
+ true ->
+ ok;
+ false ->
+ erlfdb:set(Tx, get_transaction_id(Tx, LayerPrefix), <<>>),
+ put(?PDICT_TX_RES_KEY, Result)
+ end,
+ Result.
+clear_transaction() ->
+ fabric2_txids:remove(get(?PDICT_TX_ID_KEY)),
+ erase(?PDICT_TX_ID_KEY),
+ erase(?PDICT_TX_RES_KEY).
+is_commit_unknown_result() ->
+ erlfdb:get_last_error() == ?COMMIT_UNKNOWN_RESULT.
+has_transaction_id() ->
+ is_binary(get(?PDICT_TX_ID_KEY)).
+transaction_id_exists(Tx) ->
+ erlfdb:wait(erlfdb:get(Tx, get(?PDICT_TX_ID_KEY))) == <<>>.
+get_transaction_id(Tx, LayerPrefix) ->
+ case get(?PDICT_TX_ID_KEY) of
+ undefined ->
+ TxId = fabric2_txids:create(Tx, LayerPrefix),
+ put(?PDICT_TX_ID_KEY, TxId),
+ TxId;
+ TxId when is_binary(TxId) ->
+ TxId
+ end.
+with_span(Operation, ExtraTags, Fun) ->
+ case ctrace:has_span() of
+ true ->
+ Tags = maps:merge(#{
+ 'span.kind' => <<"client">>,
+ component => <<"couchdb.fabric">>,
+ 'db.instance' => fabric2_server:fdb_cluster(),
+ 'db.namespace' => fabric2_server:fdb_directory(),
+ 'db.type' => <<"fdb">>,
+ nonce => get(nonce),
+ pid => self()
+ }, ExtraTags),
+ ctrace:with_span(Operation, Tags, Fun);
+ false ->
+ Fun()
+ end.
+get_info_wait_int(#info_future{} = InfoFuture) ->
+ #info_future{
+ db_prefix = DbPrefix,
+ changes_future = ChangesFuture,
+ uuid_future = UUIDFuture,
+ meta_future = MetaFuture
+ } = InfoFuture,
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ vs_to_seq(fabric2_util:seq_zero_vs());
+ [{SeqKey, _}] ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
+ vs_to_seq(SeqVS)
+ end,
+ CProp = {update_seq, RawSeq},
+ UUIDProp = {uuid, erlfdb:wait(UUIDFuture)},
+ MProps = lists:foldl(fun({K, V}, Acc) ->
+ case erlfdb_tuple:unpack(K, DbPrefix) of
+ {?DB_STATS, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)} | Acc];
+ {?DB_STATS, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)} | Acc];
+ {?DB_STATS, <<"sizes">>, Name} ->
+ Val = ?bin2uint(V),
+ {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
+ NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
+ lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
+ {?DB_STATS, _} ->
+ Acc
+ end
+ end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)),
+ [CProp, UUIDProp | MProps].
+binary_chunk_size() ->
+ config:get_integer(
+ "fabric", "binary_chunk_size", ?DEFAULT_BINARY_CHUNK_SIZE).
+fdb_to_revinfo_version_compatibility_test() ->
+ DocId = <<"doc_id">>,
+ FirstRevFormat = 0,
+ RevPos = 1,
+ Rev = <<60,84,174,140,210,120,192,18,100,148,9,181,129,165,248,92>>,
+ RevPath = {},
+ NotDeleted = true,
+ Sequence = {versionstamp, 10873034897377, 0, 0},
+ BranchCount = 1,
+ KeyWinner = {?DB_REVS, DocId, NotDeleted, RevPos, Rev},
+ ValWinner = {FirstRevFormat, Sequence, BranchCount, RevPath},
+ ExpectedWinner = expected(
+ true, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence),
+ ?assertEqual(ExpectedWinner, fdb_to_revinfo(KeyWinner, ValWinner)),
+ KeyLoser = {?DB_REVS, DocId, NotDeleted, RevPos, Rev},
+ ValLoser = {FirstRevFormat, RevPath},
+ ExpectedLoser = expected(
+ false, undefined, NotDeleted, RevPos, Rev, RevPath, undefined),
+ ?assertEqual(ExpectedLoser, fdb_to_revinfo(KeyLoser, ValLoser)),
+ ok.
+expected(Winner, BranchCount, NotDeleted, RevPos, Rev, RevPath, Sequence) ->
+ #{
+ att_hash => <<>>,
+ branch_count => BranchCount,
+ deleted => not NotDeleted,
+ exists => true,
+ rev_id => {RevPos, Rev},
+ rev_path => tuple_to_list(RevPath),
+ rev_size => 0,
+ sequence => Sequence,
+ winner => Winner
+ }.