path: root/src/couch_views/src/couch_views_fdb.erl
diff options
Diffstat (limited to 'src/couch_views/src/couch_views_fdb.erl')
1 files changed, 331 insertions, 0 deletions
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
new file mode 100644
index 000000000..b0fb82e85
--- /dev/null
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -0,0 +1,331 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+ get_view_state/2,
+ new_interactive_index/3,
+ new_creation_vs/3,
+ get_creation_vs/2,
+ get_build_status/2,
+ set_build_status/3,
+ get_update_seq/2,
+ set_update_seq/3,
+ list_signatures/1,
+ clear_index/2,
+ persist_chunks/3,
+ update_kv_size/4
+get_view_state(Db, #mrst{} = Mrst) ->
+ get_view_state(Db, Mrst#mrst.sig);
+get_view_state(Db, Sig) when is_binary(Sig) ->
+ #{
+ tx := Tx
+ } = Db,
+ VersionF = erlfdb:get(Tx, version_key(Db, Sig)),
+ ViewSeqF = erlfdb:get(Tx, seq_key(Db, Sig)),
+ ViewVSF = erlfdb:get(Tx, creation_vs_key(Db, Sig)),
+ BuildStatusF = erlfdb:get(Tx, build_status_key(Db, Sig)),
+ Version = case erlfdb:wait(VersionF) of
+ not_found -> not_found;
+ VsnVal -> element(1, erlfdb_tuple:unpack(VsnVal))
+ end,
+ ViewSeq = case erlfdb:wait(ViewSeqF) of
+ not_found -> <<>>;
+ SeqVal -> SeqVal
+ end,
+ ViewVS = case erlfdb:wait(ViewVSF) of
+ not_found -> not_found;
+ VSVal -> element(1, erlfdb_tuple:unpack(VSVal))
+ end,
+ State = #{
+ version => Version,
+ view_seq => ViewSeq,
+ view_vs => ViewVS,
+ build_status => erlfdb:wait(BuildStatusF)
+ },
+ maybe_upgrade_view(Db, Sig, State).
+new_interactive_index(Db, #mrst{} = Mrst, VS) ->
+ new_interactive_index(Db, Mrst#mrst.sig, VS);
+new_interactive_index(Db, Sig, VS) ->
+ set_version(Db, Sig),
+ new_creation_vs(Db, Sig, VS),
+ set_build_status(Db, Sig, ?INDEX_BUILDING).
+%Interactive View Creation Versionstamp
+new_creation_vs(TxDb, #mrst{} = Mrst, VS) ->
+ new_creation_vs(TxDb, Mrst#mrst.sig, VS);
+new_creation_vs(TxDb, Sig, VS) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = creation_vs_key(TxDb, Sig),
+ Value = erlfdb_tuple:pack_vs({VS}),
+ ok = erlfdb:set_versionstamped_value(Tx, Key, Value).
+get_creation_vs(TxDb, MrstOrSig) ->
+ #{
+ view_vs := ViewVS
+ } = get_view_state(TxDb, MrstOrSig),
+ ViewVS.
+%Interactive View Build Status
+get_build_status(TxDb, MrstOrSig) ->
+ #{
+ build_status := BuildStatus
+ } = get_view_state(TxDb, MrstOrSig),
+ BuildStatus.
+set_build_status(TxDb, #mrst{} = Mrst, State) ->
+ set_build_status(TxDb, Mrst#mrst.sig, State);
+set_build_status(TxDb, Sig, State) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = build_status_key(TxDb, Sig),
+ ok = erlfdb:set(Tx, Key, State).
+% View Build Sequence Access
+% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
+get_update_seq(TxDb, MrstOrSig) ->
+ #{
+ view_seq := ViewSeq
+ } = get_view_state(TxDb, MrstOrSig),
+ ViewSeq.
+set_update_seq(TxDb, Sig, Seq) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ ok = erlfdb:set(Tx, seq_key(TxDb, Sig), Seq).
+list_signatures(Db) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ RangePrefix = erlfdb_tuple:pack(ViewSeqRange, DbPrefix),
+ fabric2_fdb:fold_range(Db, RangePrefix, fun({Key, _Val}, Acc) ->
+ {Sig} = erlfdb_tuple:unpack(Key, RangePrefix),
+ [Sig | Acc]
+ end, [], []).
+clear_index(Db, Signature) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+ % Get view size to remove from global counter
+ SizeTuple = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_KV_SIZE, Signature},
+ SizeKey = erlfdb_tuple:pack(SizeTuple, DbPrefix),
+ ViewSize = case erlfdb:wait(erlfdb:get(Tx, SizeKey)) of
+ not_found -> 0;
+ SizeVal -> ?bin2uint(SizeVal)
+ end,
+ % Clear index info keys
+ Keys = [
+ ],
+ lists:foreach(fun(Key) ->
+ FDBKey = erlfdb_tuple:pack(Key, DbPrefix),
+ erlfdb:clear(Tx, FDBKey)
+ end, Keys),
+ % Clear index data
+ DataTuple = {?DB_VIEWS, ?VIEW_DATA, Signature},
+ DataPrefix = erlfdb_tuple:pack(DataTuple, DbPrefix),
+ erlfdb:clear_range_startswith(Tx, DataPrefix),
+ % Clear tree data
+ TreeTuple = {?DB_VIEWS, ?VIEW_TREES, Signature},
+ TreePrefix = erlfdb_tuple:pack(TreeTuple, DbPrefix),
+ erlfdb:clear_range_startswith(Tx, TreePrefix),
+ % Decrement db wide view size counter
+ DbSizeTuple = {?DB_STATS, <<"sizes">>, <<"views">>},
+ DbSizeKey = erlfdb_tuple:pack(DbSizeTuple, DbPrefix),
+ erlfdb:add(Tx, DbSizeKey, -ViewSize).
+persist_chunks(Tx, set, [Key, Value]) ->
+ Chunks = fabric2_fdb:chunkify_binary(Value),
+ LastId = lists:foldl(fun(Chunk, Id) ->
+ ChunkKey = erlfdb_tuple:pack({Id}, Key),
+ erlfdb:set(Tx, ChunkKey, Chunk),
+ Id + 1
+ end, 0, Chunks),
+ % We update nodes in place, so its possible that
+ % a node shrank. This clears any keys that we haven't
+ % just overwritten for the provided key.
+ LastIdKey = erlfdb_tuple:pack({LastId}, Key),
+ EndRange = <<Key/binary, 16#FF>>,
+ erlfdb:clear_range(Tx, LastIdKey, EndRange);
+persist_chunks(Tx, get, Key) ->
+ Rows = erlfdb:get_range_startswith(Tx, Key),
+ Values = [V || {_K, V} <- Rows],
+ iolist_to_binary(Values);
+persist_chunks(Tx, clear, Key) ->
+ erlfdb:clear_range_startswith(Tx, Key).
+update_kv_size(TxDb, Sig, OldSize, NewSize) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+ ViewTuple = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_KV_SIZE, Sig},
+ ViewKey = erlfdb_tuple:pack(ViewTuple, DbPrefix),
+ erlfdb:set(Tx, ViewKey, ?uint2bin(NewSize)),
+ DbTuple = {?DB_STATS, <<"sizes">>, <<"views">>},
+ DbKey = erlfdb_tuple:pack(DbTuple, DbPrefix),
+ erlfdb:add(Tx, DbKey, NewSize - OldSize).
+maybe_upgrade_view(_Db, _Sig, #{version := ?CURRENT_VIEW_IMPL_VERSION} = St) ->
+ St;
+maybe_upgrade_view(Db, Sig, #{version := not_found, view_seq := <<>>} = St) ->
+ % If we haven't started building the view yet
+ % then we don't change view_vs and build_status
+ % as they're still correct.
+ set_version(Db, Sig),
+ St#{
+ view_seq => <<>>
+ };
+maybe_upgrade_view(Db, Sig, #{version := not_found} = St) ->
+ clear_index(Db, Sig),
+ set_version(Db, Sig),
+ {ViewVS, BuildStatus} = reset_interactive_index(Db, Sig, St),
+ #{
+ view_seq => <<>>,
+ view_vs => ViewVS,
+ build_status => BuildStatus
+ }.
+set_version(Db, Sig) ->
+ #{
+ tx := Tx
+ } = Db,
+ Key = version_key(Db, Sig),
+ Val = erlfdb_tuple:pack({?CURRENT_VIEW_IMPL_VERSION}),
+ erlfdb:set(Tx, Key, Val).
+reset_interactive_index(_Db, _Sig, #{view_vs := not_found}) ->
+ % Not an interactive index
+ {not_found, not_found};
+reset_interactive_index(Db, Sig, _St) ->
+ % We have to reset the creation versionstamp
+ % to the current update seq of the database
+ % or else we'll not have indexed any documents
+ % inserted since the creation of the interactive
+ % index.
+ #{
+ tx := Tx
+ } = Db,
+ DbSeq = fabric2_db:get_update_seq(Db),
+ VS = fabric2_fdb:seq_to_vs(DbSeq),
+ Key = creation_vs_key(Db, Sig),
+ Val = erlfdb_tuple:pack({VS}),
+ ok = erlfdb:set(Tx, Key, Val),
+ set_build_status(Db, Sig, ?INDEX_BUILDING),
+version_key(Db, Sig) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ erlfdb_tuple:pack(Key, DbPrefix).
+seq_key(Db, Sig) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ erlfdb_tuple:pack(Key, DbPrefix).
+creation_vs_key(Db, Sig) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ erlfdb_tuple:pack(Key, DbPrefix).
+build_status_key(Db, Sig) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ erlfdb_tuple:pack(Key, DbPrefix).