diff options
Diffstat (limited to 'src/fabric/src/fabric2_fdb.erl')
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 236 |
1 files changed, 111 insertions, 125 deletions
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 4b0182646..670ce8b49 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -24,7 +24,7 @@ delete/1, exists/1, - list_dbs/2, + list_dbs/4, get_info/1, get_config/1, @@ -50,11 +50,13 @@ read_attachment/3, write_attachment/3, - fold_docs/4, - fold_changes/5, get_last_change/1, + fold_range/5, + vs_to_seq/1, + seq_to_vs/1, + next_vs/1, debug_cluster/0, debug_cluster/2 @@ -254,16 +256,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) -> end. -list_dbs(Tx, _Options) -> +list_dbs(Tx, Callback, AccIn, Options) -> Root = erlfdb_directory:root(), CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), LayerPrefix = erlfdb_directory:get_name(CouchDB), - {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix), - Future = erlfdb:get_range(Tx, Start, End), - lists:map(fun({K, _V}) -> - {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix), - DbName - end, erlfdb:wait(Future)). + Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix), + fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) -> + {DbName} = erlfdb_tuple:unpack(K, Prefix), + Callback(DbName, Acc) + end, AccIn, Options). get_info(#{} = Db) -> @@ -508,24 +509,26 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> UpdateStatus = case {OldWinner, NewWinner} of {not_found, #{deleted := false}} -> created; + {not_found, #{deleted := true}} -> + deleted; {#{deleted := true}, #{deleted := false}} -> recreated; {#{deleted := false}, #{deleted := false}} -> updated; {#{deleted := false}, #{deleted := true}} -> + deleted; + {#{deleted := true}, #{deleted := true}} -> deleted end, case UpdateStatus of - Status when Status == created orelse Status == recreated -> - ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), - ADVal = erlfdb_tuple:pack(NewRevId), - ok = erlfdb:set(Tx, ADKey, ADVal); deleted -> ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), ok = erlfdb:clear(Tx, ADKey); - updated -> - ok + _ -> + ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), + ADVal = erlfdb_tuple:pack(NewRevId), + ok = erlfdb:set(Tx, ADKey, ADVal) end, % _changes @@ -640,84 +643,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) -> {ok, AttId}. -fold_docs(#{} = Db, UserFun, UserAcc0, Options) -> - #{ - tx := Tx, - db_prefix := DbPrefix - } = ensure_current(Db), - - {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options), - - DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix), - DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)), - - try - UserAcc1 = maybe_stop(UserFun({meta, [ - {total, ?bin2uint(DocCountBin)}, - {offset, null} - ]}, UserAcc0)), - - UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> - {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix), - RevId = erlfdb_tuple:unpack(V), - maybe_stop(UserFun({row, [ - {id, DocId}, - {key, DocId}, - {value, couch_doc:rev_to_str(RevId)} - ]}, UserAccIn)) - end, UserAcc1, [{reverse, Reverse}] ++ Options), - - {ok, maybe_stop(UserFun(complete, UserAcc2))} - catch throw:{stop, FinalUserAcc} -> - {ok, FinalUserAcc} - end. - - -fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) -> - #{ - tx := Tx, - db_prefix := DbPrefix - } = ensure_current(Db), - - SinceSeq1 = get_since_seq(SinceSeq0), - - Reverse = case fabric2_util:get_value(dir, Options, fwd) of - fwd -> false; - rev -> true - end, - - {Start0, End0} = case Reverse of - false -> {SinceSeq1, fabric2_util:seq_max_vs()}; - true -> {fabric2_util:seq_zero_vs(), SinceSeq1} - end, - - Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix), - End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix), - - {Start, End} = case Reverse of - false -> {erlfdb_key:first_greater_than(Start1), End1}; - true -> {Start1, erlfdb_key:first_greater_than(End1)} - end, - - try - {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> - {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), - {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V), - - Change = #{ - id => DocId, - sequence => vs_to_seq(SeqVS), - rev_id => RevId, - deleted => Deleted - }, - - maybe_stop(UserFun(Change, UserAccIn)) - end, UserAcc0, [{reverse, Reverse}] ++ Options)} - catch throw:{stop, FinalUserAcc} -> - {ok, FinalUserAcc} - end. - - get_last_change(#{} = Db) -> #{ tx := Tx, @@ -735,17 +660,57 @@ get_last_change(#{} = Db) -> end. -maybe_stop({ok, Acc}) -> - Acc; -maybe_stop({stop, Acc}) -> - throw({stop, Acc}). +fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) -> + #{ + tx := Tx + } = ensure_current(Db), + fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options); + +fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) -> + case fabric2_util:get_value(limit, Options) of + 0 -> + % FoundationDB treats a limit of 0 as unlimited + % so we have to guard for that here. + UserAcc; + _ -> + {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options), + Callback = fun fold_range_cb/2, + Acc = {skip, Skip, UserCallback, UserAcc}, + {skip, _, UserCallback, OutAcc} = + erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts), + OutAcc + end. -vs_to_seq(VS) -> +vs_to_seq(VS) when is_tuple(VS) -> + % 51 is the versionstamp type tag <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}), fabric2_util:to_hex(SeqBin). +seq_to_vs(Seq) when is_binary(Seq) -> + Seq1 = fabric2_util:from_hex(Seq), + % 51 is the versionstamp type tag + Seq2 = <<51:8, Seq1/binary>>, + {VS} = erlfdb_tuple:unpack(Seq2), + VS. + + +next_vs({versionstamp, VS, Batch, TxId}) -> + {V, B, T} = case TxId =< 65535 of + true -> + {VS, Batch, TxId + 1}; + false -> + case Batch =< 65535 of + true -> + {VS, Batch + 1, 0}; + false -> + {VS + 1, 0, 0} + end + end, + {versionstamp, V, B, T}. + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). @@ -753,7 +718,7 @@ debug_cluster() -> debug_cluster(Start, End) -> transactional(fun(Tx) -> lists:foreach(fun({Key, Val}) -> - io:format("~s => ~s~n", [ + io:format(standard_error, "~s => ~s~n", [ string:pad(erlfdb_util:repr(Key), 60), erlfdb_util:repr(Val) ]) @@ -790,7 +755,7 @@ load_validate_doc_funs(#{} = Db) -> {end_key, <<"_design0">>} ], - {ok, Infos1} = fold_docs(Db, FoldFun, [], Options), + {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options), Infos2 = lists:map(fun(Info) -> #{ @@ -999,11 +964,12 @@ chunkify_attachment(Data) -> end. -get_dir_and_bounds(DbPrefix, Options) -> - Reverse = case fabric2_util:get_value(dir, Options, fwd) of - fwd -> false; - rev -> true +get_fold_opts(RangePrefix, Options) -> + Reverse = case fabric2_util:get_value(dir, Options) of + rev -> true; + _ -> false end, + StartKey0 = fabric2_util:get_value(start_key, Options), EndKeyGt = fabric2_util:get_value(end_key_gt, Options), EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt), @@ -1019,17 +985,17 @@ get_dir_and_bounds(DbPrefix, Options) -> % Set the maximum bounds for the start and endkey StartKey2 = case StartKey1 of - undefined -> {?DB_ALL_DOCS}; - SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2} + undefined -> <<>>; + SK2 -> SK2 end, EndKey2 = case EndKey1 of - undefined -> {?DB_ALL_DOCS, <<16#FF>>}; - EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2} + undefined -> <<255>>; + EK2 -> EK2 end, - StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix), - EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix), + StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix), + EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix), % FoundationDB ranges are applied as SK <= key < EK % By default, CouchDB is SK <= key <= EK with the @@ -1056,26 +1022,46 @@ get_dir_and_bounds(DbPrefix, Options) -> EndKey3 end, - {Reverse, StartKey4, EndKey4}. + Skip = case fabric2_util:get_value(skip, Options) of + S when is_integer(S), S >= 0 -> S; + _ -> 0 + end, + Limit = case fabric2_util:get_value(limit, Options) of + L when is_integer(L), L >= 0 -> [{limit, L + Skip}]; + undefined -> [] + end, -get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0-> - fabric2_util:seq_zero_vs(); + TargetBytes = case fabric2_util:get_value(target_bytes, Options) of + T when is_integer(T), T >= 0 -> [{target_bytes, T}]; + undefined -> [] + end, -get_since_seq(Seq) when Seq == now; Seq == <<"now">> -> - fabric2_util:seq_max_vs(); + StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of + undefined -> []; + Name when is_atom(Name) -> [{streaming_mode, Name}] + end, + + Snapshot = case fabric2_util:get_value(snapshot, Options) of + undefined -> []; + B when is_boolean(B) -> [{snapshot, B}] + end, + + OutOpts = [{reverse, Reverse}] + ++ Limit + ++ TargetBytes + ++ StreamingMode + ++ Snapshot, + + {StartKey4, EndKey4, Skip, OutOpts}. -get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 -> - Seq1 = fabric2_util:from_hex(Seq), - Seq2 = <<51:8, Seq1/binary>>, - {SeqVS} = erlfdb_tuple:unpack(Seq2), - SeqVS; -get_since_seq(List) when is_list(List) -> - get_since_seq(list_to_binary(List)); +fold_range_cb(KV, {skip, 0, Callback, Acc}) -> + NewAcc = Callback(KV, Acc), + {skip, 0, Callback, NewAcc}; -get_since_seq(Seq) -> - erlang:error({invalid_since_seq, Seq}). +fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 -> + {skip, N - 1, Callback, Acc}. get_db_handle() -> |