summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric2_fdb.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric2_fdb.erl')
-rw-r--r--src/fabric/src/fabric2_fdb.erl236
1 files changed, 111 insertions, 125 deletions
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 4b0182646..670ce8b49 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,7 +24,7 @@
delete/1,
exists/1,
- list_dbs/2,
+ list_dbs/4,
get_info/1,
get_config/1,
@@ -50,11 +50,13 @@
read_attachment/3,
write_attachment/3,
- fold_docs/4,
- fold_changes/5,
get_last_change/1,
+ fold_range/5,
+
vs_to_seq/1,
+ seq_to_vs/1,
+ next_vs/1,
debug_cluster/0,
debug_cluster/2
@@ -254,16 +256,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
end.
-list_dbs(Tx, _Options) ->
+list_dbs(Tx, Callback, AccIn, Options) ->
Root = erlfdb_directory:root(),
CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
LayerPrefix = erlfdb_directory:get_name(CouchDB),
- {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix),
- Future = erlfdb:get_range(Tx, Start, End),
- lists:map(fun({K, _V}) ->
- {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix),
- DbName
- end, erlfdb:wait(Future)).
+ Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+ fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
+ {DbName} = erlfdb_tuple:unpack(K, Prefix),
+ Callback(DbName, Acc)
+ end, AccIn, Options).
get_info(#{} = Db) ->
@@ -508,24 +509,26 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
UpdateStatus = case {OldWinner, NewWinner} of
{not_found, #{deleted := false}} ->
created;
+ {not_found, #{deleted := true}} ->
+ deleted;
{#{deleted := true}, #{deleted := false}} ->
recreated;
{#{deleted := false}, #{deleted := false}} ->
updated;
{#{deleted := false}, #{deleted := true}} ->
+ deleted;
+ {#{deleted := true}, #{deleted := true}} ->
deleted
end,
case UpdateStatus of
- Status when Status == created orelse Status == recreated ->
- ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
- ADVal = erlfdb_tuple:pack(NewRevId),
- ok = erlfdb:set(Tx, ADKey, ADVal);
deleted ->
ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
ok = erlfdb:clear(Tx, ADKey);
- updated ->
- ok
+ _ ->
+ ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+ ADVal = erlfdb_tuple:pack(NewRevId),
+ ok = erlfdb:set(Tx, ADKey, ADVal)
end,
% _changes
@@ -640,84 +643,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
{ok, AttId}.
-fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = ensure_current(Db),
-
- {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options),
-
- DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix),
- DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)),
-
- try
- UserAcc1 = maybe_stop(UserFun({meta, [
- {total, ?bin2uint(DocCountBin)},
- {offset, null}
- ]}, UserAcc0)),
-
- UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix),
- RevId = erlfdb_tuple:unpack(V),
- maybe_stop(UserFun({row, [
- {id, DocId},
- {key, DocId},
- {value, couch_doc:rev_to_str(RevId)}
- ]}, UserAccIn))
- end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
- {ok, maybe_stop(UserFun(complete, UserAcc2))}
- catch throw:{stop, FinalUserAcc} ->
- {ok, FinalUserAcc}
- end.
-
-
-fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = ensure_current(Db),
-
- SinceSeq1 = get_since_seq(SinceSeq0),
-
- Reverse = case fabric2_util:get_value(dir, Options, fwd) of
- fwd -> false;
- rev -> true
- end,
-
- {Start0, End0} = case Reverse of
- false -> {SinceSeq1, fabric2_util:seq_max_vs()};
- true -> {fabric2_util:seq_zero_vs(), SinceSeq1}
- end,
-
- Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix),
- End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix),
-
- {Start, End} = case Reverse of
- false -> {erlfdb_key:first_greater_than(Start1), End1};
- true -> {Start1, erlfdb_key:first_greater_than(End1)}
- end,
-
- try
- {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
- {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
-
- Change = #{
- id => DocId,
- sequence => vs_to_seq(SeqVS),
- rev_id => RevId,
- deleted => Deleted
- },
-
- maybe_stop(UserFun(Change, UserAccIn))
- end, UserAcc0, [{reverse, Reverse}] ++ Options)}
- catch throw:{stop, FinalUserAcc} ->
- {ok, FinalUserAcc}
- end.
-
-
get_last_change(#{} = Db) ->
#{
tx := Tx,
@@ -735,17 +660,57 @@ get_last_change(#{} = Db) ->
end.
-maybe_stop({ok, Acc}) ->
- Acc;
-maybe_stop({stop, Acc}) ->
- throw({stop, Acc}).
+fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+ #{
+ tx := Tx
+ } = ensure_current(Db),
+ fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
+
+fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
+ case fabric2_util:get_value(limit, Options) of
+ 0 ->
+ % FoundationDB treats a limit of 0 as unlimited
+ % so we have to guard for that here.
+ UserAcc;
+ _ ->
+ {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
+ Callback = fun fold_range_cb/2,
+ Acc = {skip, Skip, UserCallback, UserAcc},
+ {skip, _, UserCallback, OutAcc} =
+ erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
+ OutAcc
+ end.
-vs_to_seq(VS) ->
+vs_to_seq(VS) when is_tuple(VS) ->
+ % 51 is the versionstamp type tag
<<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
fabric2_util:to_hex(SeqBin).
+seq_to_vs(Seq) when is_binary(Seq) ->
+ Seq1 = fabric2_util:from_hex(Seq),
+ % 51 is the versionstamp type tag
+ Seq2 = <<51:8, Seq1/binary>>,
+ {VS} = erlfdb_tuple:unpack(Seq2),
+ VS.
+
+
+next_vs({versionstamp, VS, Batch, TxId}) ->
+ {V, B, T} = case TxId =< 65535 of
+ true ->
+ {VS, Batch, TxId + 1};
+ false ->
+ case Batch =< 65535 of
+ true ->
+ {VS, Batch + 1, 0};
+ false ->
+ {VS + 1, 0, 0}
+ end
+ end,
+ {versionstamp, V, B, T}.
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -753,7 +718,7 @@ debug_cluster() ->
debug_cluster(Start, End) ->
transactional(fun(Tx) ->
lists:foreach(fun({Key, Val}) ->
- io:format("~s => ~s~n", [
+ io:format(standard_error, "~s => ~s~n", [
string:pad(erlfdb_util:repr(Key), 60),
erlfdb_util:repr(Val)
])
@@ -790,7 +755,7 @@ load_validate_doc_funs(#{} = Db) ->
{end_key, <<"_design0">>}
],
- {ok, Infos1} = fold_docs(Db, FoldFun, [], Options),
+ {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
Infos2 = lists:map(fun(Info) ->
#{
@@ -999,11 +964,12 @@ chunkify_attachment(Data) ->
end.
-get_dir_and_bounds(DbPrefix, Options) ->
- Reverse = case fabric2_util:get_value(dir, Options, fwd) of
- fwd -> false;
- rev -> true
+get_fold_opts(RangePrefix, Options) ->
+ Reverse = case fabric2_util:get_value(dir, Options) of
+ rev -> true;
+ _ -> false
end,
+
StartKey0 = fabric2_util:get_value(start_key, Options),
EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
@@ -1019,17 +985,17 @@ get_dir_and_bounds(DbPrefix, Options) ->
% Set the maximum bounds for the start and endkey
StartKey2 = case StartKey1 of
- undefined -> {?DB_ALL_DOCS};
- SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2}
+ undefined -> <<>>;
+ SK2 -> SK2
end,
EndKey2 = case EndKey1 of
- undefined -> {?DB_ALL_DOCS, <<16#FF>>};
- EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2}
+ undefined -> <<255>>;
+ EK2 -> EK2
end,
- StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix),
- EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix),
+ StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix),
+ EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix),
% FoundationDB ranges are applied as SK <= key < EK
% By default, CouchDB is SK <= key <= EK with the
@@ -1056,26 +1022,46 @@ get_dir_and_bounds(DbPrefix, Options) ->
EndKey3
end,
- {Reverse, StartKey4, EndKey4}.
+ Skip = case fabric2_util:get_value(skip, Options) of
+ S when is_integer(S), S >= 0 -> S;
+ _ -> 0
+ end,
+ Limit = case fabric2_util:get_value(limit, Options) of
+ L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
+ undefined -> []
+ end,
-get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
- fabric2_util:seq_zero_vs();
+ TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
+ T when is_integer(T), T >= 0 -> [{target_bytes, T}];
+ undefined -> []
+ end,
-get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
- fabric2_util:seq_max_vs();
+ StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
+ undefined -> [];
+ Name when is_atom(Name) -> [{streaming_mode, Name}]
+ end,
+
+ Snapshot = case fabric2_util:get_value(snapshot, Options) of
+ undefined -> [];
+ B when is_boolean(B) -> [{snapshot, B}]
+ end,
+
+ OutOpts = [{reverse, Reverse}]
+ ++ Limit
+ ++ TargetBytes
+ ++ StreamingMode
+ ++ Snapshot,
+
+ {StartKey4, EndKey4, Skip, OutOpts}.
-get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
- Seq1 = fabric2_util:from_hex(Seq),
- Seq2 = <<51:8, Seq1/binary>>,
- {SeqVS} = erlfdb_tuple:unpack(Seq2),
- SeqVS;
-get_since_seq(List) when is_list(List) ->
- get_since_seq(list_to_binary(List));
+fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
+ NewAcc = Callback(KV, Acc),
+ {skip, 0, Callback, NewAcc};
-get_since_seq(Seq) ->
- erlang:error({invalid_since_seq, Seq}).
+fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
+ {skip, N - 1, Callback, Acc}.
get_db_handle() ->