diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-03-26 15:20:47 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-03-26 15:20:47 -0500 |
commit | f6e64ec5fceb4b4f2abd60ae5c4e9a8451eddd85 (patch) | |
tree | f08316dcdff3dfda2e1194215f47d8dec39aaf0c | |
parent | 6221795b5494e3bb08bc7afd4f10b0d78270bed8 (diff) | |
download | couchdb-f6e64ec5fceb4b4f2abd60ae5c4e9a8451eddd85.tar.gz |
WIP: Refactoring fdb layer code
-rw-r--r-- | src/fabric/include/fabric2.hrl | 21 | ||||
-rw-r--r-- | src/fabric/src/fabric.app.src | 2 | ||||
-rw-r--r-- | src/fabric/src/fabric2.erl | 219 | ||||
-rw-r--r-- | src/fabric/src/fabric2_app.erl (renamed from src/fabric/src/fabric_app.erl) | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 808 | ||||
-rw-r--r-- | src/fabric/src/fabric2_doc.erl | 494 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 490 | ||||
-rw-r--r-- | src/fabric/src/fabric2_security.erl (renamed from src/fabric/src/fabric_security.erl) | 0 | ||||
-rw-r--r-- | src/fabric/src/fabric2_server.erl | 91 | ||||
-rw-r--r-- | src/fabric/src/fabric2_sup.erl (renamed from src/fabric/src/fabric_sup.erl) | 6 | ||||
-rw-r--r-- | src/fabric/src/fabric2_util.erl | 109 | ||||
-rw-r--r-- | src/fabric/src/fabric_server.erl | 139 |
12 files changed, 1403 insertions, 980 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl new file mode 100644 index 000000000..e56f84fdc --- /dev/null +++ b/src/fabric/include/fabric2.hrl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-define(SYSTEM_DATABASES, [ + <<"_dbs">>, + <<"_global_changes">>, + <<"_metadata">>, + <<"_nodes">>, + <<"_replicator">>, + <<"_users">> +]). diff --git a/src/fabric/src/fabric.app.src b/src/fabric/src/fabric.app.src index f59108290..20fbb1e2a 100644 --- a/src/fabric/src/fabric.app.src +++ b/src/fabric/src/fabric.app.src @@ -13,7 +13,7 @@ {application, fabric, [ {description, "Routing and proxying layer for CouchDB cluster"}, {vsn, git}, - {mod, {fabric_app, []}}, + {mod, {fabric2_app, []}}, {registered, [ fabric_server ]}, diff --git a/src/fabric/src/fabric2.erl b/src/fabric/src/fabric2.erl index a0d67d8a0..6acf23f48 100644 --- a/src/fabric/src/fabric2.erl +++ b/src/fabric/src/fabric2.erl @@ -13,231 +13,12 @@ -module(fabric2). --export([ - %% % Databases - %% all_dbs/0, - %% all_dbs/1, - create_db/1, - create_db/2, - delete_db/1, - delete_db/2, - open_db/1, - open_db/2, - get_db_info/1, - get_doc_count/1, - get_doc_count/2, - %% get_revs_limit/1, - %% set_revs_limit/3, - get_security/1, - set_security/2, - - %% get_purge_infos_limit/1, - %% set_purge_infos_limit/3, - %% - %% compact/1, - %% compact/2, - %% - %% get_partition_info/2, - %% - % Documents - open_doc/3, - open_revs/4, - %% - %% get_doc_info/3, - %% get_full_doc_info/3, - %% - %% get_missing_revs/2, - %% get_missing_revs/3, - - update_doc/3, - update_docs/3 - - %% purge_docs/3, - %% - %% att_receiver/2, - %% - %% % Views - %% all_docs/4, - %% all_docs/5, - %% - %% changes/4, - %% end_changes/0, - %% - %% query_view/3, - %% query_view/4, - %% query_view/6, - %% query_view/7, - %% get_view_group_info/2, - %% - %% % Miscellany - %% design_docs/1, - %% - %% reset_validation_funs/1, - %% - %% cleanup_index_files/0, - %% cleanup_index_files/1, - %% cleanup_index_files_all_nodes/1, - %% - %% dbname/1 -]). - - --include_lib("fabric/include/fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). - - --define(DEFAULT_SECURITY, <<"{}">>). - - -create_db(DbName) -> - create_db(DbName, []). - - -create_db(DbName, _Options) -> - DbsDir = fabric_server:get_dir(dbs), - fabric_server:transactional(fun(Tx) -> - try - DbDir = erlfdb_directory:create(Tx, DbsDir, DbName), - fabric2_db:init(DbName, Tx, DbDir) - catch error:{erlfdb_directory, {create_error, path_exists, _}} -> - {error, file_exists} - end - end). - - -delete_db(DbName) -> - delete_db(DbName, []). - - -delete_db(DbName, _Options) -> - try - DbsDir = fabric_server:get_dir(dbs), - fabric_server:transactional(fun(Tx) -> - erlfdb_directory:remove(Tx, DbsDir, DbName) - end) - catch error:{erlfdb_directory, {remove_error, path_missing, _}} -> - erlang:error(database_does_not_exist) - end. - - -open_db(DbName) -> - open_db(DbName, []). - - -open_db(DbName, Options) -> - fabric2_db:open(DbName, Options). - - -get_db_info(DbName) when is_binary(DbName) -> - get_db_info(open_db(DbName)); - -get_db_info(Db) -> - DbProps = fabric2_db:with_tx(Db, fun(TxDb) -> - {CStart, CEnd} = fabric2_db:range_bounds(TxDb, {<<"changes">>}), - ChangesFuture = fabric2_db:get_range(TxDb, {CStart}, {CEnd}, [ - {streaming_mode, exact}, - {limit, 1}, - {reverse, true} - ]), - - StatsPrefix = {<<"meta">>, <<"stats">>}, - MetaFuture = fabric2_db:get_range_startswith(TxDb, StatsPrefix), - - RawSeq = case erlfdb:wait(ChangesFuture) of - [] -> - <<0:80>>; - [{SeqKey, _}] -> - {<<"changes">>, SeqBin} = fabric2_db:unpack(TxDb, SeqKey), - SeqBin - end, - CProp = {update_seq, ?l2b(couch_util:to_hex(RawSeq))}, - - MProps = lists:flatmap(fun({K, V}) -> - case fabric2_db:unpack(TxDb, K) of - {_, _, <<"doc_count">>} -> - [{doc_count, ?bin2uint(V)}]; - {_, _, <<"doc_del_count">>} -> - [{doc_del_count, ?bin2uint(V)}]; - {_, _, <<"size">>} -> - Val = ?bin2uint(V), - [ - {other, {[{data_size, Val}]}}, - {sizes, {[ - {active, 0}, - {external, Val}, - {file, 0} - ]}} - ]; - _ -> - [] - end - end, erlfdb:wait(MetaFuture)), - - [CProp | MProps] - end), - - BaseProps = [ - {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}}, - {compact_running, false}, - {data_size, 0}, - {db_name, fabric2_db:name(Db)}, - {disk_format_version, 0}, - {disk_size, 0}, - {instance_start_time, <<"0">>}, - {purge_seq, 0} - ], - - {ok, lists:foldl(fun({Key, Val}, Acc) -> - lists:keystore(Key, 1, Acc, {Key, Val}) - end, BaseProps, DbProps)}. - - -get_doc_count(DbName) -> - get_doc_count(DbName, <<"_all_docs">>). - - -get_doc_count(DbName, <<"_all_docs">>) -> - get_doc_count(DbName, <<"doc_count">>); - -get_doc_count(DbName, <<"_design">>) -> - get_doc_count(DbName, <<"doc_design_count">>); - -get_doc_count(DbName, <<"_local">>) -> - get_doc_count(DbName, <<"doc_local_count">>); - -get_doc_count(Db, Key) -> - fabric2_db:with_tx(Db, fun(TxDb) -> - Future = fabric2_db:get(TxDb, {<<"meta">>, <<"stats">>, Key}), - ?bin2uint(erlfdb:wait(Future)) - end). - - -get_security(DbName) when is_binary(DbName) -> - get_security(open_db(DbName)); - -get_security(Db) -> - Key = {<<"meta">>, <<"config">>, <<"security_doc">>}, - SecJson = fabric2_db:with_tx(Db, fun(TxDb) -> - erlfdb:wait(fabric2_db:get(TxDb, Key)) - end), - ?JSON_DECODE(SecJson). - - -set_security(DbName, ErlJson) when is_binary(DbName) -> - set_security(open_db(DbName), ErlJson); - -set_security(Db, ErlJson) -> - Key = {<<"meta">>, <<"config">>, <<"security_doc">>}, - SecJson = ?JSON_ENCODE(ErlJson), - fabric2_db:with_tx(Db, fun(TxDb) -> - fabric2_db:set(TxDb, Key, SecJson) - end). open_doc(Db, DocId, Options) -> diff --git a/src/fabric/src/fabric_app.erl b/src/fabric/src/fabric2_app.erl index 77d9886a7..9becf8bfa 100644 --- a/src/fabric/src/fabric_app.erl +++ b/src/fabric/src/fabric2_app.erl @@ -10,7 +10,7 @@ % License for the specific language governing permissions and limitations under % the License. --module(fabric_app). +-module(fabric2_app). -behaviour(application). @@ -21,7 +21,7 @@ start(_Type, StartArgs) -> - fabric_sup:start_link(StartArgs). + fabric2_sup:start_link(StartArgs). stop(_State) -> diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 7bb7f0838..5df51dde7 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -14,175 +14,739 @@ -export([ + create/2, open/2, - init/3, + delete/2, name/1, - get_vdus/1, + get_after_doc_read_fun/1, + get_before_doc_update_fun/1, + get_committed_update_seq/1, + get_compacted_seq/1, + get_compactor_pid/1, + get_db_info/1, + %% get_partition_info/2, + get_del_doc_count/1, + get_doc_count/1, + get_doc_count/2, + %% get_epochs/1, + %% get_filepath/1, + get_instance_start_time/1, + get_pid/1, + get_revs_limit/1, + get_security/1, + get_update_seq/1, + get_user_ctx/1, + get_uuid/1, + %% get_purge_seq/1, + %% get_oldest_purge_seq/1, + %% get_purge_infos_limit/1, + + is_clustered/1, + is_db/1, + is_partitioned/1, + is_system_db/1, + is_system_db_name/1, + + set_revs_limit/2, + %% set_purge_infos_limit/2, + set_security/2, + set_user_ctx/2, + + ensure_full_commit/1, + ensure_full_commit/2, + + %% load_validation_funs/1, + %% reload_validation_funs/1, + + open_doc/2, + open_doc/3, + open_doc_revs/4, + %% open_doc_int/3, + %% get_doc_info/2, + %% get_full_doc_info/2, + %% get_full_doc_infos/2, + %% get_missing_revs/2, + %% get_design_doc/2, + %% get_design_docs/1, + %% get_design_doc_count/1, + %% get_purge_infos/2, + + %% get_minimum_purge_seq/1, + %% purge_client_exists/3, + + %% validate_docid/2, + %% doc_from_json_obj_validate/2, + + update_doc/2, + update_doc/3, + update_docs/2, + update_docs/3, + %% delete_doc/3, + + %% purge_docs/2, + %% purge_docs/3, + + %% with_stream/3, + %% open_write_stream/2, + %% open_read_stream/2, + %% is_active_stream/2, + + %% fold_docs/3, + %% fold_docs/4, + %% fold_local_docs/4, + %% fold_design_docs/4, + %% fold_changes/4, + %% fold_changes/5, + %% count_changes_since/2, + %% fold_purge_infos/4, + %% fold_purge_infos/5, + + %% calculate_start_seq/3, + %% owner_of/2, + + %% start_compact/1, + %% cancel_compact/1, + %% wait_for_compaction/1, + %% wait_for_compaction/2, + + %% dbname_suffix/1, + %% normalize_dbname/1, + %% validate_dbname/1, + + %% make_doc/5, + new_revid/1 +]). - with_tx/2, - pack/2, - pack_vs/2, - unpack/2, - range_bounds/2, +-include_lib("fabric/include/fabric2.hrl"). - get/2, - get_range/3, - get_range/4, - get_range_startswith/2, - get_range_startswith/3 -]). +-define(DBNAME_REGEX, + "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex + "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end +). --include_lib("couch/include/couch_db.hrl"). --include_lib("fabric/include/fabric.hrl"). +-define(RETURN(Term), throw({?MODULE, Term})). -open(DbName, Options) when is_binary(DbName), is_list(Options) -> - BaseDb = #{ - vsn => 1, - name => DbName, - tx => undefined, - dir => undefined, +create(DbName, Options) -> + Result = fabric2_util:transactional(DbName, Options, fun(TxDb) -> + case fabric2_fdb:db_exists(TxDb) of + true -> + {error, file_exists}; + false -> + fabric2_fdb:db_create(TxDb) + end, + end), + % We cache outside of the transaction so that we're sure + % that this request created the database + case Result of + #{} = Db -> + fabric2_server:store(Db); + Error -> + Error + end. + + +open(DbName, Options) -> + case fabric2_server:fetch(DbName) of + #{} = Db -> + fabric2_util:transactional(Db, fun(TxDb) -> + case fabric2_fdb:db_is_current(TxDb) of + true -> + Db; + false -> + Reopend = fabric2_fdb:db_open(TxDb), + fabric2_server:store(Reopened) + end + end); + undefined -> + fabric2_util:transactional(DbName, Options, fun(TxDb) -> + Opened = fabric2_fdb:db_open(TxDb), + fabric2_server:store(Opened) + end) + end. + + +delete(DbName, Options) -> + % This will throw if the db does not exist + Db = open(DbName, Options), + with_tx(Db, fun(TxDb) -> + fabric2_fdb:db_delete(TxDb) + end). - instance_start_time => 0, - user_ctx => #user_ctx{}, - security => [], - validate_doc_update => [], - before_doc_update => nil, - after_doc_read => nil, - options => [] - }, - lists:foldl(fun({K, V}, DbAcc) -> - maps:put(K, V, DbAcc) - end, BaseDb, Options). +name(#{name := DbName}) -> + DbName. + + +get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) -> + AfterDocRead. + + +get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) -> + BeforeDocUpdate. +get_commited_update_seq(#{} = Db) -> + get_update_seq(Db). -name(#{name := Name}) -> - Name. +get_compacted_seq(#{} = Db) -> + get_update_seq(Db). -get_vdus(#{validate_doc_update := VDUs}) -> - VDUs. +get_compactor_pid(#{} = Db) -> + nil. -init(DbName, Tx, DbDir) -> - TxDb = open(DbName, [{tx, Tx}, {dir, DbDir}]), - Defaults = [ - {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)}, - {{<<"meta">>, <<"config">>, <<"security_doc">>}, <<"{}">>}, - {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)} + +get_db_info(#{} = Db) -> + DbProps = with_tx(Db, fun(TxDb) -> + fabric2_fdb:db_get_info(TxDb) + end), + + BaseProps = [ + {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}}, + {compact_running, false}, + {data_size, 0}, + {db_name, DbName}, + {disk_format_version, 0}, + {disk_size, 0}, + {instance_start_time, <<"0">>}, + {purge_seq, 0} ], - lists:foreach(fun({K, V}) -> - erlfdb:set(Tx, pack(TxDb, K), V) - end, Defaults). - - -with_tx(Db, Fun) when is_function(Fun, 1) -> - DbName = maps:get(name, Db), - DbsDir = fabric_server:get_dir(dbs), - fabric_server:transactional(fun(Tx) -> - % We'll eventually want to cache this in the - % fabric_server ets table. - DbDir = try - erlfdb_directory:open(Tx, DbsDir, DbName) - catch error:{erlfdb_directory, {open_error, path_missing, _}} -> - erlang:error(database_does_not_exist) - end, - TxDb = Db#{tx := Tx, dir := DbDir}, - Fun(TxDb) + + {ok, lists:foldl(fun({Key, Val}, Acc) -> + lists:keystore(Key, 1, Acc, {Key, Val}) + end, BaseProps, DbProps)}. + + +get_del_doc_count(#{} = Db) -> + get_doc_count(Db, <<"doc_del_count">>). + + +get_doc_count(Db) -> + get_doc_count(Db, <<"doc_count">>). + + +get_doc_count(Db, <<"_all_docs">>) -> + get_doc_count(Db, <<"doc_count">>); + +get_doc_count(DbName, <<"_design">>) -> + get_doc_count(DbName, <<"doc_design_count">>); + +get_doc_count(DbName, <<"_local">>) -> + get_doc_count(DbName, <<"doc_local_count">>); + +get_doc_count(Db, Key) -> + with_tx(Db, fun(TxDb) -> + fabric2_fdb:get_stat(TxDb, Key) end). -pack(#{dir := undefined} = Db, _Tuple) -> - erlang:error({no_directory, Db}); +get_instance_startime(#{}) -> + 0. -pack(Db, Tuple) -> - #{ - dir := Dir - } = Db, - erlfdb_directory:pack(Dir, Tuple). +get_pid(#{}) -> + nil. -pack_vs(#{dir := undefined} = Db, _Tuple) -> - erlang:error({no_directory, Db}); -pack_vs(Db, Tuple) -> - #{ - dir := Dir - } = Db, - erlfdb_directory:pack_vs(Dir, Tuple). +get_revs_limit(#{revs_limit := RevsLimit}) -> + RevsLimit. -unpack(#{dir := undefined} = Db, _Key) -> - erlang:error({no_directory, Db}); +get_security(#{security_doc := SecurityDoc}) -> + SecurityDoc. -unpack(Db, Key) -> - #{ - dir := Dir - } = Db, - erlfdb_directory:unpack(Dir, Key). +get_update_seq(#{} = Db) -> + case fabric2_fdb:db_get_changes(Db, [{limit, 1}, {reverse, true}]) of + [] -> + fabric2_util:to_hex(<<0:80>>); + [{Seq, _}] -> + Seq + end. -range_bounds(#{dir := undefined} = Db, _Key) -> - erlang:error({no_directory, Db}); -range_bounds(Db, Key) -> - #{ - dir := Dir - } = Db, - erlfdb_directory:range(Dir, Key). +get_uuid(#{uuid := UUID}) -> + UUID. +is_clustered(#{}) -> + false. -get(#{tx := undefined} = Db, _Key) -> - erlang:error({invalid_tx_db, Db}); -get(Db, Key) -> - #{ - tx := Tx, - dir := Dir - } = Db, - BinKey = erlfdb_directory:pack(Dir, Key), - erlfdb:get(Tx, BinKey). +is_db(#{name := _}) -> + true; +is_db(_) -> + false. -get_range(Db, StartKey, EndKey) -> - get_range(Db, StartKey, EndKey, []). +is_partitioned(#{}) -> + false. -get_range(#{tx := undefined} = Db, _, _, _) -> - erlang:error({invalid_tx_db, Db}); +is_system_db(#{name := DbName}) -> + is_system_db_name(DbName). -get_range(Db, StartKey, EndKey, Options) -> - #{ - tx := Tx, - dir := Dir - } = Db, - BinStartKey = erlfdb_directory:pack(Dir, StartKey), - BinEndKey= erlfdb_directory:pack(Dir, EndKey), - erlfdb:get_range(Tx, BinStartKey, BinEndKey, Options). +is_system_db_name(DbName) when is_list(DbName) -> + is_system_db_name(?l2b(DbName)); +is_system_db_name(DbName) when is_binary(DbName) -> + Suffix = filename:basename(DbName), + case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of + {<<".">>, Result} -> Result; + {_Prefix, false} -> false; + {Prefix, true} -> + ReOpts = [{capture,none}, dollar_endonly], + re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match + end. -get_range_startswith(Db, Prefix) -> - get_range_startswith(Db, Prefix, []). +set_revs_limit(#{} = Db, RevsLimit) -> + RevsLimBin = ?uint2bin(RevsLimit), + with_tx(Db, fun(TxDb) -> + fabric2_fdb:db_set_config(TxDb, <<"revs_limit">>, RevsLimBin) + end). -get_range_startswith(#{tx := undefined} = Db, _, _) -> - erlang:error({invalid_tx_db, Db}); -get_range_startswith(Db, Prefix, Options) -> +set_security(#{} = Db, Security) -> + SecBin = ?JSON_ENCODE(Security), + with_tx(Db, fun(TxDb) -> + fabric2_fdb:db_set_config(TxDb, <<"security_doc">>, SecBin) + end). + + +set_user_ctx(#{} = Db, UserCtx) -> + Db#{user_ctx => UserCtx}. + + +ensure_full_commit(#{}) -> + {ok, 0}. + + +ensure_full_commit(#{}, _Timeout) -> + {ok, 0}. + + +open_doc(#{} = Db, DocId) -> + open_doc(Db, DocId, []). + + +open_doc(#{} = Db, DocId, Options) -> + with_tx(Db, fun(TxDb) -> + case fabric2_fdb:get_full_doc_info(TxDb, DocId) of + not_found -> + {not_found, missing}; + #full_doc_info{} = FDI -> + {_, Path} = couch_doc:to_doc_info_path(FDI), + case fabric2_fdb:get_doc_body(TxDb, DocId, Path) of + #doc{} = Doc -> {ok, Doc}; + Error -> Error + end + end + end). + + +open_doc_revs(Db, FDI, Revs, Options) -> + #full_doc_info{ + id = Id, + rev_tree = RevTree + } = FDI, + Latest = lists:member(latest, Options), + {Found, Missing} = case Revs of + all -> + {couch_key_tree:get_all_leafs(RevTree), []}; + _ when Latest -> + couch_key_tree:get_key_leafs(RevTree, Revs); + _ -> + couch_key_tree:get(RevTree, Revs) + end, + Docs = with_tx(Db, fun(TxDb) -> + lists:map(fun({Value, {Pos, [Rev | _]} = RevPath}) -> + case Value of + ?REV_MISSING -> + % We have the rev in our list but know nothing about it + {{not_found, missing}, {Pos, Rev}}; + _ -> + case fabric2_fdb:get_doc_body(Db, Id, RevPath) of + #doc{} = Doc -> {ok, Doc}; + Else -> {Else, {Pos, Rev}} + end + end + end, Found) + end), + MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing], + {ok, Docs ++ MissingDocs}. + + +update_doc(Db, Doc) -> + update_doc(Db, Doc, []). + + +update_doc(Db, Doc, Options) -> + with_tx(Db, fun(TxDb) -> + update_doc_int(TxDb, Doc, Options) + end). + + +update_docs(Db, Docs) -> + update_docs(Db, Docs, []). + + +update_docs(Db, Docs, Options) -> + with_tx(Db, fun(TxDb) -> + {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) -> + case fabric2_doc:update(TxDb, Doc, opts(Options)) of + {ok, _} = Resp -> + {Resp, Acc}; + {error, _} = Resp -> + {Resp, error} + end + end, ok, Docs), + {Status, Resps} + end). + + +new_revid(Doc) -> + #doc{ + body = Body, + revs = {OldStart, OldRevs}, + atts = Atts, + deleted = Deleted + } = Doc, + + DigestedAtts = lists:foldl(fun(Att, Acc) -> + [N, T, M] = couch_att:fetch([name, type, md5], Att), + case M == <<>> of + true -> Acc; + false -> [{N, T, M} | Acc] + end + end, [], Atts), + + Rev = case DigestedAtts of + Atts2 when length(Atts) =/= length(Atts2) -> + % We must have old style non-md5 attachments + list_to_binary(integer_to_list(couch_util:rand32())); + Atts2 -> + OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end, + SigTerm = [Deleted, OldStart, OldRev, Body, Atts2], + couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}])) + end, + + Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}. + + +% TODO: Handle _local docs separately. +update_doc_int(#{} = Db, #doc{} = Doc0, Options) -> + UpdateType = case lists:member(replicated_changes, Options) of + true -> replicated_changes; + false -> interactive_edit + end, + + try + FDI1 = fabric2_fdb:get_full_doc_info(Db, Doc0#doc.id), + Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType), + Doc2 = case UpdateType of + interactive_edit -> new_revid(Doc1); + replicated_changes -> Doc1 + end, + FDI2 = if FDI1 /= not_found -> FDI1; true -> + #full_doc_info{id = Doc2#doc.id} + end, + {FDI3, Doc3} = merge_rev_tree(FDI2, Doc2, UpdateType), + + OldExists = case FDI1 of + not_found -> false; + #full_doc_info{deleted = true} -> false; + _ -> true + end, + NewExists = not FDI3#full_doc_info.deleted, + + ok = fabric2_fdb:write_doc(Db, FDI3, Doc3) + + case {OldExists, NewExists} of + {false, true} -> + fabric2_fdb:incr_stat(Db, <<"doc_count">>, 1); + {true, false} -> + fabric2_fdb:incr_stat(Db, <<"doc_count">>, -1), + fabric2_fdb:incr_stat(Db, <<"doc_del_count">>, 1); + {Exists, Exists} -> + % No change + ok + end, + + % Need to count design documents + % Need to track db size changes + + {ok, {RevStart, Rev}} + catch throw:{?MODULE, Return} -> + Return + end. + +prep_and_validate(Db, not_found, Doc, UpdateType) -> + case Doc#doc.revs of + {0, []} -> + ok; + _ when UpdateType == replicated_changes -> + ok; + _ -> + ?RETURN({error, conflict}) + end, + prep_and_validate(Db, Doc, fun() -> nil end); + +prep_and_validate(Db, FDI, Doc, interactive_edit) -> + #doc{ + revs = {Start, Revs} + } = Doc, + + Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree), + LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) -> + {{LeafStart, LeafRev}, Path} + end, Leafs), + + GetDocFun = case Revs of + [PrevRev | _] -> + case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of + {{Start, PrevRev}, Path} -> + fun() -> + fabric2_fdb:get_doc_body(Db, Doc#doc.id, {Start, Path}) + end; + false -> + ?RETURN({error, conflict}) + end; + [] -> + case FDI#full_doc_info.deleted of + true -> + fun() -> nil end; + false -> + ?RETURN({error, conflict}) + end + end, + prep_and_validate(TxDb, Doc, GetDocFun); + +prep_and_validate(TxDb, FDI, Doc, replicated_changes) -> + #full_doc_info{ + rev_tree = RevTree + } = FDI, + OldLeafs = couch_key_tree:get_all_leafs_full(RevTree), + OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs], + + NewPath = couch_doc:to_path(Doc), + NewRevTree = couch_key_tree:merge(RevTree, NewPath), + + Leafs = couch_key_tree:get_all_leafs_full(NewRevTree), + LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) -> + [{{Start, RevId}, FullPath}] + end, Leafs), + LeafRevsFullDict = dict:from_list(LeafRevsFull), + + #doc{ + revs = {DocStart, [DocRev | _]} + } = Doc, + DocRevId = {DocStart, DocRev}, + + IsOldLeaf = lists:member(DocRevId, OldLeafsLU), + GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of + {ok, {DocStart, RevPath}} when not IsOldLeaf -> + % An incoming replicated edit only sends us leaf + % nodes which may have included multiple updates + % we haven't seen locally. Thus we have to search + % back through the tree to find the first edit + % we do know about. + case find_prev_known_rev(DocStart, RevPath) of + not_found -> fun() -> nil end; + PrevRevs -> fun() -> + fabric2_fdb:get_doc_body(Db, Doc#doc.id, PrevRevs) + end + end; + _ -> + % The update merged to an internal node that we + % already know about which means we're done with + % this update. + ?RETURN({ok, []}) + end, + + prep_and_validate(TxDb, Doc, GetDocFun). + + +prep_and_validate(Db, Doc, GetDocBody) -> + NewDoc = case couch_doc:has_stubs(Doc) of + true -> + case GetDocBody() of + #doc{} = PrevDoc -> + couch_doc:merge_stubs(Doc, PrevDoc); + _ -> + % Force a missing stubs error + couch_doc:mege_stubs(Doc, #doc{}) + end; + false -> + Doc + end, + validate_doc_update(Db, NewDoc, GetDocBody), + NewDoc. + + +merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted -> + % We're recreating a document that was previously + % deleted. To check that this is a recreation from + % the root we assert that the new document has a + % revision depth of 1 (this is to avoid recreating a + % doc from a previous internal revision) and is also + % not deleted. To avoid expanding the revision tree + % unnecessarily we create a new revision based on + % the winning deleted revision. + + {RevDepth, _} = Doc#doc.revs, + case RevDepth == 1 andalso not Doc#doc.deleted of + true -> + % Update the new doc based on revisions in OldInfo + #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI), + #rev_info{rev={OldPos, OldRev}} = WinningRev, + Body = case couch_util:get_value(comp_body, Doc#doc.meta) of + CompBody when is_binary(CompBody) -> + couch_compress:decompress(CompBody); + _ -> + Doc#doc.body + end, + NewDoc = new_revid(Doc#doc{ + revs = {OldPos, [OldRev]}, + body = Body + }), + + % Merge our modified new doc into the tree + #full_doc_info{rev_tree = RevTree} = FDI, + case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of + {NewRevTree, new_leaf} -> + % We changed the revision id so inform the caller + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = false + }, + {NewFDI, NewDoc}; + _ -> + throw(doc_recreation_failed) + end; + _ -> + ?RETURN({error, conflict}) + end; +merge_rev_tree(FDI, Doc, interactive_edit) -> + % We're attempting to merge a new revision into an + % undeleted document. To not be a conflict we require + % that the merge results in extending a branch. + + RevTree = FDI#full_doc_info.rev_tree, + case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of + {NewRevTree, new_leaf} when not Doc#doc.deleted -> + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = false + }, + {NewFDI, Doc}; + {NewRevTree, new_leaf} when Doc#doc.deleted -> + % We have to check if we just deleted this + % document completely or if it was a conflict + % resolution. + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = couch_doc:is_deleted(NewRevTree) + }, + {NewFDI, Doc}; + _ -> + ?RETURN({error, conflict}) + end; +merge_rev_tree(FDI, Doc, replicated_changes) -> + % We're merging in revisions without caring about + % conflicts. Most likely this is a replication update. + RevTree = FDI#full_doc_info.rev_tree, + {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)), + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = couch_doc:is_deleted(NewRevTree) + }, + % If a replicated change did not modify the revision + % tree then we've got nothing else to do. + if NewFDI /= FDI -> ok; true -> + ?RETURN({ok, []}) + end, + {NewFDI, Doc}. + + +validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) -> + #{ + security_doc := Security + } = Db, + case catch check_is_admin(Db) of + ok -> validate_ddoc(Db, Doc); + Error -> ?RETURN(Error) + end; +validate_doc_update(_Db, #doc{id = <<"_local/", _/binary>>}, _) -> + ok; +validate_doc_update(Db, Doc, GetDiskDocFun) -> #{ - tx := Tx, - dir := Dir + security_doc := Security, + user_ctx := UserCtx, + validate_doc_update_funs := VDUs } = Db, - BinPrefix = erlfdb_directory:pack(Dir, Prefix), - erlfdb:get_range_startswith(Tx, BinPrefix, Options). + Fun = fun() -> + DiskDoc = GetDiskDocFun(), + JsonCtx = fabric2_util:user_ctx_to_json(TxDb), + try + lists:map(fun(VDU) -> + case VDU(Doc, DiskDoc, JsonCtx, Security) of + ok -> ok; + Error -> ?RETURN(Error) + end + end, VDUs), + ok + catch + throw:Error -> + Error + end + end, + Stat = [couchdb, query_server, vdu_process_time], + if VDUs == [] -> ok; true -> + couch_stats:update_histogram(Stat, Fun) + end. + + +validate_ddoc(Db, DDoc) -> + try + ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc)) + catch + throw:{invalid_design_doc, Reason} -> + throw({bad_request, invalid_design_doc, Reason}); + throw:{compilation_error, Reason} -> + throw({bad_request, compilation_error, Reason}); + throw:Error -> + ?RETURN(Error) + end. + + +find_prev_known_rev(_Pos, []) -> + not_found; +find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) -> + % doc records are skipped because these are the result + % of replication sending us an update. We're only interested + % in what we have locally since we're comparing attachment + % stubs. The replicator should never do this because it + % should only ever send leaves but the possibility exists + % so we account for it. + find_prev_known_rev(Pos - 1, RestPath); +find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) -> + find_prev_known_rev(Pos - 1, RestPath); +find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) -> + {Pos, [Rev || {Rev, _Val} <- DocPath]}. + + +with_tx(#{tx := undefined} = Db, Fun) -> + fabric2_util:transactional(fun(Tx) -> + Fun(Db#{tx => Tx}) + end); + +with_tx(#{tx := {erlfdb_transaction, _}} = Db, Fun) -> + Fun(Db). + diff --git a/src/fabric/src/fabric2_doc.erl b/src/fabric/src/fabric2_doc.erl deleted file mode 100644 index d3ff04f39..000000000 --- a/src/fabric/src/fabric2_doc.erl +++ /dev/null @@ -1,494 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric2_doc). - - --export([ - get_fdi/2, - - open/3, - open_revs/4, - - update/3 -]). - - --include_lib("couch/include/couch_db.hrl"). - - --define(RETURN(Term), throw({?MODULE, Term})). - - -get_fdi(TxDb, DocId) -> - Future = fabric2_db:get(TxDb, {<<"docs">>, DocId}), - fdb_to_fdi(TxDb, DocId, erlfdb:wait(Future)). - - -open(TxDb, DocId, {Pos, [Rev | _] = Path}) -> - Key = {<<"revs">>, DocId, Pos, Rev}, - Future = fabric2_db:get(TxDb, Key), - fdb_to_doc(TxDb, DocId, Pos, Path, erlfdb:wait(Future)). - - -open_revs(TxDb, FDI, Revs, Options) -> - #full_doc_info{ - id = Id, - rev_tree = RevTree - } = FDI, - Latest = lists:member(latest, Options), - {Found, Missing} = case Revs of - all -> - {couch_key_tree:get_all_leafs(RevTree), []}; - _ when Latest -> - couch_key_tree:get_key_leafs(RevTree, Revs); - _ -> - couch_key_tree:get(RevTree, Revs) - end, - Docs = lists:map(fun({Value, {Pos, [Rev | _]} = RevPath}) -> - case Value of - ?REV_MISSING -> - % We have the rev in our list but know nothing about it - {{not_found, missing}, {Pos, Rev}}; - _ -> - case open(TxDb, Id, RevPath) of - #doc{} = Doc -> {ok, Doc}; - Else -> {Else, {Pos, Rev}} - end - end - end, Found), - MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing], - {ok, Docs ++ MissingDocs}. - - -% TODO: Handle _local docs separately. -update(TxDb, #doc{} = Doc0, Options) -> - UpdateType = case lists:member(replicated_changes, Options) of - true -> replicated_changes; - false -> interactive_edit - end, - - try - FDI1 = get_fdi(TxDb, Doc0#doc.id), - Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType), - Doc2 = case UpdateType of - interactive_edit -> new_revid(Doc1); - replicated_changes -> Doc1 - end, - FDI2 = if FDI1 /= not_found -> FDI1; true -> - #full_doc_info{id = Doc2#doc.id} - end, - {FDI3, Doc3} = merge_rev_tree(FDI2, Doc2, UpdateType), - - #{tx := Tx} = TxDb, - - % Delete old entry in changes feed - OldSeqKey = {<<"changes">>, FDI3#full_doc_info.update_seq}, - OldSeqKeyBin = fabric2_db:pack(TxDb, OldSeqKey), - erlfdb:clear(Tx, OldSeqKeyBin), - - % Add new entry to changes feed - NewSeqKey = {<<"changes">>, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}}, - NewSeqKeyBin = fabric2_db:pack_vs(TxDb, NewSeqKey), - erlfdb:set_versionstamped_key(Tx, NewSeqKeyBin, Doc3#doc.id), - - {RevStart, [Rev | _]} = Doc3#doc.revs, - - % Write document data - {NewDocKey, NewDocVal} = doc_to_fdb(TxDb, Doc3), - erlfdb:set(Tx, NewDocKey, NewDocVal), - - % Update revision tree entry - {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI3), - erlfdb:set_versionstamped_value(Tx, NewFDIKey, NewFDIVal), - - {DocIncr, DocDelIncr} = case {FDI1, FDI3} of - {not_found, _} -> - {1, 0}; - {#full_doc_info{deleted = true}, #full_doc_info{deleted = false}} -> - {1, -1}; - {#full_doc_info{deleted = false}, #full_doc_info{deleted = true}} -> - {-1, 1}; - _ -> - {0, 0} - end, - - DocCountKey = {<<"meta">>, <<"stats">>, <<"doc_count">>}, - DocCountKeyBin = fabric2_db:pack(TxDb, DocCountKey), - DocDelCountKey = {<<"meta">>, <<"stats">>, <<"doc_del_count">>}, - DocDelCountKeyBin = fabric2_db:pack(TxDb, DocDelCountKey), - - if DocIncr == 0 -> ok; true -> - erlfdb:add(Tx, DocCountKeyBin, DocIncr) - end, - - if DocDelIncr == 0 -> ok; true -> - erlfdb:add(Tx, DocDelCountKeyBin, DocDelIncr) - end, - - % TODO: Count design docs separately - - % And done. - {ok, {RevStart, Rev}} - catch throw:{?MODULE, Return} -> - Return - end. - - -prep_and_validate(TxDb, not_found, Doc, UpdateType) -> - case Doc#doc.revs of - {0, []} -> - ok; - _ when UpdateType == replicated_changes -> - ok; - _ -> - ?RETURN({error, conflict}) - end, - prep_and_validate(TxDb, Doc, fun() -> nil end); - -prep_and_validate(TxDb, FDI, Doc, interactive_edit) -> - #doc{ - revs = {Start, Revs} - } = Doc, - - Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree), - LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) -> - {{LeafStart, LeafRev}, Path} - end, Leafs), - - GetDocFun = case Revs of - [PrevRev | _] -> - case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of - {{Start, PrevRev}, Path} -> - fun() -> open(TxDb, Doc#doc.id, {Start, Path}) end; - false -> - ?RETURN({error, conflict}) - end; - [] -> - case FDI#full_doc_info.deleted of - true -> - fun() -> nil end; - false -> - ?RETURN({error, conflict}) - end - end, - prep_and_validate(TxDb, Doc, GetDocFun); - -prep_and_validate(TxDb, FDI, Doc, replicated_changes) -> - #full_doc_info{ - rev_tree = RevTree - } = FDI, - OldLeafs = couch_key_tree:get_all_leafs_full(RevTree), - OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs], - - NewPath = couch_doc:to_path(Doc), - NewRevTree = couch_key_tree:merge(RevTree, NewPath), - - Leafs = couch_key_tree:get_all_leafs_full(NewRevTree), - LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) -> - [{{Start, RevId}, FullPath}] - end, Leafs), - LeafRevsFullDict = dict:from_list(LeafRevsFull), - - #doc{ - revs = {DocStart, [DocRev | _]} - } = Doc, - DocRevId = {DocStart, DocRev}, - - IsOldLeaf = lists:member(DocRevId, OldLeafsLU), - GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of - {ok, {DocStart, RevPath}} when not IsOldLeaf -> - % An incoming replicated edit only sends us leaf - % nodes which may have included multiple updates - % we haven't seen locally. Thus we have to search - % back through the tree to find the first edit - % we do know about. - case find_prev_known_rev(DocStart, RevPath) of - not_found -> fun() -> nil end; - PrevRevs -> fun() -> open(TxDb, Doc#doc.id, PrevRevs) end - end; - _ -> - % The update merged to an internal node that we - % already know about which means we're done with - % this update. - ?RETURN({ok, []}) - end, - - prep_and_validate(TxDb, Doc, GetDocFun). - - -prep_and_validate(TxDb, Doc, GetDocBody) -> - NewDoc = case couch_doc:has_stubs(Doc) of - true -> - case GetDocBody() of - #doc{} = PrevDoc -> - couch_doc:merge_stubs(Doc, PrevDoc); - _ -> - % Force a missing stubs error - couch_doc:mege_stubs(Doc, #doc{}) - end; - false -> - Doc - end, - validate_doc_update(TxDb, NewDoc, GetDocBody), - NewDoc. - - -merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted -> - % We're recreating a document that was previously - % deleted. To check that this is a recreation from - % the root we assert that the new document has a - % revision depth of 1 (this is to avoid recreating a - % doc from a previous internal revision) and is also - % not deleted. To avoid expanding the revision tree - % unnecessarily we create a new revision based on - % the winning deleted revision. - - {RevDepth, _} = Doc#doc.revs, - case RevDepth == 1 andalso not Doc#doc.deleted of - true -> - % Update the new doc based on revisions in OldInfo - #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI), - #rev_info{rev={OldPos, OldRev}} = WinningRev, - Body = case couch_util:get_value(comp_body, Doc#doc.meta) of - CompBody when is_binary(CompBody) -> - couch_compress:decompress(CompBody); - _ -> - Doc#doc.body - end, - NewDoc = new_revid(Doc#doc{ - revs = {OldPos, [OldRev]}, - body = Body - }), - - % Merge our modified new doc into the tree - #full_doc_info{rev_tree = RevTree} = FDI, - case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of - {NewRevTree, new_leaf} -> - % We changed the revision id so inform the caller - NewFDI = FDI#full_doc_info{ - rev_tree = NewRevTree, - deleted = false - }, - {NewFDI, NewDoc}; - _ -> - throw(doc_recreation_failed) - end; - _ -> - ?RETURN({error, conflict}) - end; -merge_rev_tree(FDI, Doc, interactive_edit) -> - % We're attempting to merge a new revision into an - % undeleted document. To not be a conflict we require - % that the merge results in extending a branch. - - RevTree = FDI#full_doc_info.rev_tree, - case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of - {NewRevTree, new_leaf} when not Doc#doc.deleted -> - NewFDI = FDI#full_doc_info{ - rev_tree = NewRevTree, - deleted = false - }, - {NewFDI, Doc}; - {NewRevTree, new_leaf} when Doc#doc.deleted -> - % We have to check if we just deleted this - % document completely or if it was a conflict - % resolution. - NewFDI = FDI#full_doc_info{ - rev_tree = NewRevTree, - deleted = couch_doc:is_deleted(NewRevTree) - }, - {NewFDI, Doc}; - _ -> - ?RETURN({error, conflict}) - end; -merge_rev_tree(FDI, Doc, replicated_changes) -> - % We're merging in revisions without caring about - % conflicts. Most likely this is a replication update. - RevTree = FDI#full_doc_info.rev_tree, - {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)), - NewFDI = FDI#full_doc_info{ - rev_tree = NewRevTree, - deleted = couch_doc:is_deleted(NewRevTree) - }, - % If a replicated change did not modify the revision - % tree then we've got nothing else to do. - if NewFDI /= FDI -> ok; true -> - ?RETURN({ok, []}) - end, - {NewFDI, Doc}. - - - -validate_doc_update(TxDb, #doc{id = <<"_design/", _/binary>>} = Doc, _) -> - case catch fabric2_db:check_is_admin(TxDb) of - ok -> validate_ddoc(TxDb, Doc); - Error -> ?RETURN(Error) - end; -validate_doc_update(_TxDb, #doc{id = <<"_local/", _/binary>>}, _) -> - ok; -validate_doc_update(TxDb, Doc, GetDiskDocFun) -> - Fun = fun() -> - DiskDoc = GetDiskDocFun(), - JsonCtx = couch_util:json_user_ctx(TxDb), - SecObj = fabric2_db:get_security(TxDb), - try - lists:map(fun(VDU) -> - case VDU(Doc, DiskDoc, JsonCtx, SecObj) of - ok -> ok; - Error -> ?RETURN(Error) - end - end, fabric2_db:get_vdus(TxDb)), - ok - catch - throw:Error -> - Error - end - end, - Stat = [couchdb, query_server, vdu_process_time], - case fabric2_db:get_vdus(TxDb) of - [] -> - ok; - _ -> - couch_stats:update_histogram(Stat, Fun) - end. - - -validate_ddoc(TxDb, DDoc) -> - try - ok = couch_index_server:validate(TxDb, couch_doc:with_ejson_body(DDoc)) - catch - throw:{invalid_design_doc, Reason} -> - throw({bad_request, invalid_design_doc, Reason}); - throw:{compilation_error, Reason} -> - throw({bad_request, compilation_error, Reason}); - throw:Error -> - ?RETURN(Error) - end. - - -find_prev_known_rev(_Pos, []) -> - not_found; -find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) -> - % doc records are skipped because these are the result - % of replication sending us an update. We're only interested - % in what we have locally since we're comparing attachment - % stubs. The replicator should never do this because it - % should only ever send leaves but the possibility exists - % so we account for it. - find_prev_known_rev(Pos - 1, RestPath); -find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) -> - find_prev_known_rev(Pos - 1, RestPath); -find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) -> - {Pos, [Rev || {Rev, _Val} <- DocPath]}. - - -new_revid(Doc) -> - #doc{ - body = Body, - revs = {OldStart, OldRevs}, - atts = Atts, - deleted = Deleted - } = Doc, - - DigestedAtts = lists:foldl(fun(Att, Acc) -> - [N, T, M] = couch_att:fetch([name, type, md5], Att), - case M == <<>> of - true -> Acc; - false -> [{N, T, M} | Acc] - end - end, [], Atts), - - Rev = case DigestedAtts of - Atts2 when length(Atts) =/= length(Atts2) -> - % We must have old style non-md5 attachments - list_to_binary(integer_to_list(couch_util:rand32())); - Atts2 -> - OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end, - SigTerm = [Deleted, OldStart, OldRev, Body, Atts2], - couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}])) - end, - - Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}. - - -doc_to_fdb(TxDb, #doc{} = Doc) -> - #doc{ - id = Id, - revs = {Start, [Rev | _]}, - body = Body, - atts = Atts, - deleted = Deleted - } = Doc, - Key = {<<"revs">>, Id, Start, Rev}, - KeyBin = fabric2_db:pack(TxDb, Key), - Val = {Body, Atts, Deleted}, - {KeyBin, term_to_binary(Val, [{minor_version, 1}])}. - - -fdb_to_doc(_TxDb, DocId, Pos, Path, Bin) when is_binary(Bin) -> - {Body, Atts, Deleted} = binary_to_term(Bin, [safe]), - #doc{ - id = DocId, - revs = {Pos, Path}, - body = Body, - atts = Atts, - deleted = Deleted - }; -fdb_to_doc(_TxDb, _DocId, _Pos, _Path, not_found) -> - {not_found, missing}. - - -fdi_to_fdb(TxDb, #full_doc_info{} = FDI) -> - #full_doc_info{ - id = Id, - deleted = Deleted, - rev_tree = RevTree - } = flush_tree(FDI), - - Key = {<<"docs">>, Id}, - KeyBin = fabric2_db:pack(TxDb, Key), - RevTreeBin = term_to_binary(RevTree, [{minor_version, 1}]), - ValTuple = {Deleted, RevTreeBin, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}}, - Val = fabric2_db:pack_vs(TxDb, ValTuple), - {KeyBin, Val}. - - -fdb_to_fdi(TxDb, Id, Bin) when is_binary(Bin) -> - {Deleted, RevTreeBin, {versionstamp, V, B}} = fabric2_db:unpack(TxDb, Bin), - RevTree = binary_to_term(RevTreeBin, [safe]), - UpdateSeq = <<V:64/big, B:16/big>>, - #full_doc_info{ - id = Id, - deleted = Deleted, - rev_tree = RevTree, - update_seq = UpdateSeq - }; -fdb_to_fdi(_TxDb, _Id, not_found) -> - not_found. - - -flush_tree(FDI) -> - #full_doc_info{ - rev_tree = Unflushed - } = FDI, - - Flushed = couch_key_tree:map(fun(_Rev, Value) -> - case Value of - #doc{deleted = Del} -> #leaf{deleted = Del}; - _ -> Value - end - end, Unflushed), - - FDI#full_doc_info{ - rev_tree = Flushed - }. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl new file mode 100644 index 000000000..4764331ae --- /dev/null +++ b/src/fabric/src/fabric2_fdb.erl @@ -0,0 +1,490 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_fdb). + + +-export([ + init/3, + + create/1, + open/1, + delete/1, + exists/1, + is_current/1, + + get_info/1, + get_config/1, + set_config/3, + + get_stat/2, + incr_stat/3, + + get_full_doc_info/2, + get_doc_body/3, + + store_doc/3, + + get_changes/2 +]). + + +% This will eventually be the `\xFFmetadataVersion` key that is +% currently only available in FoundationDB master. +% +% https://forums.foundationdb.org/t/a-new-tool-for-managing-layer-metadata/1191 +% +% Until then we'll fake the same behavior using a randomish +% key for tracking metadata changse. Once we get to the +% new feature this will be more performant by updating +% this define. +-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>). + + +% Prefix Definitions + +-define(CLUSTER_CONFIG, 0). +-define(ALL_DBS, 1). +-define(DBS, 15). +-define(DB_CONFIG, 16). +-define(DB_STATS, 17). +-define(DB_ALL_DOCS, 18). +-define(DB_CHANGES, 19). +-define(DB_DOCS, 20). +-define(DB_REVS, 21). + + +% Various utility macros + +-define(REQUIRE_TX(Db), {erlfdb_transaction, _} = maps:get(Db, tx)). +-define(REQUIRE_CURRENT(Db), true = db_is_current(Db)). + +-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}). + + +init(Tx, DbName, Options) -> + Root = erlfdb_directory:get_root(), + CouchDB = erlfdb_directory:create_or_open(Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + #{ + name => DbName, + tx => Tx, + layer_prefix => Prefix, + options => Options + }. + + +create(#{} = Db) -> + ?REQUIRE_TX(Db), + #{ + name := DbName, + tx := Tx, + layer_prefix := LayerPrefix + } = Db, + + % Eventually DbPrefix will be HCA allocated. For now + % we're just using the DbName so that debugging is easier. + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix), + ets:set(Tx, DbKey, DbPrefix), + + UUID = fabric2_util:uuid(), + + Defaults = [ + {?DB_CONFIG, <<"uuid">>, UUID}, + {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)}, + {?DB_CONFIG, <<"security_doc">>, <<"{}">>}, + {?DB_STATS, <<"doc_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)}, + {?DB_STATS, <<"size">>, ?uint2bin(2)} + ], + lists:foreach(fun({P, K, V}) -> + Key = erlfdb_tuple:pack({P, K}, DbPrefix), + erlfdb:set(Tx, Key, V) + end, Defaults), + + Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)), + + Db#{ + uuid => UUID, + db_prefix => DbPrefix, + version => Version, + revs_limit => 1000, + security_doc => {[]} + validate_doc_update_funs => [], + + user_ctx => #user_ctx{}, + + before_doc_update => undefined, + after_doc_update => undefined + % All other db things as we add features + }. + + +open(#{} = Db0) -> + ?REQUIRE_TX(Db0), + #{ + name := DbName, + tx := Tx, + layer_prefix := LayerPrefix + } = Db0, + + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of + Bin when is_binary(Bin) -> Bin; + not_found -> erlang:error(database_does_not_exist) + end, + + Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)), + + Db1 = Db0#{ + db_preix => DbPrefix, + version => Version, + + user_ctx => #user_ctx{}, + + % Place holders until we implement these + % bits. + validate_doc_upate_funs => [], + before_doc_update => undefined, + after_doc_read => undefined + }, + + Config = db_get_config(Db1), + lists:foldl(fun({Key, Val}) -> + case Key of + <<"uuid">> -> + Db1#{uuid => Val}; + <<"revs_limit">> -> + Db1#{revs_limit => ?bin2uint(Val)}; + <<"security_doc">> -> + Db1#{security_doc => ?JSON_DECODE(Val)} + end + end, Db1, db_get_config(Db1)). + + +delete(#{} = Db) -> + ?REQUIRE_CURRENT(Db), + #{ + name := DbName, + tx := Tx, + layer_prefix := LayerPrefix, + db_prefix := DbPrefix + } = Db, + + DbKey = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix), + erlfdb:clear(Tx, DbKey), + erlfdb:clear_range_startswith(Tx, DbPrefix), + bump_metadata_version(), + ok. + + +exists(#{name := DbName} = Db) when is_binary(DbName) -> + ?REQUIRE_TX(Db), + #{ + tx := Tx, + layer_prefix := LayerPrefix + } = Db, + + DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix), + case erlfdb:wait(erlfdb:get(Tx, DbKey)) of + Bin when is_binary(Bin) -> true; + not_found -> false + end. + + +is_current(#{} = Db) -> + ?REQUIRE_TX(Db), + #{ + name := DbName, + md_version := MetaDataVersion + } = Db, + + case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of + MetaDataVersion -> true; + _NewVersion -> false + end. + + +get_info(#{} = Db) -> + ?REQUIRE_CURRENT(Db), + #{ + name := DbName, + tx := Tx, + db_prefix := DbPrefix + } = Db, + + {CStart, CEnd} = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix), + ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [ + {streaming_mode, exact}, + {limit, 1}, + {reverse, true} + ]), + + StatsPrefix = erlfdb_tuple:pack(?DB_STATS, DbPrefix), + MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix), + + RawSeq = case erlfdb:wait(ChangesFuture) of + [] -> + <<0:80>>; + [{SeqKey, _}] -> + {?DB_CHANGES, SeqBin} = erlfdb_tuple:unpack(SeqKey, DbPrefix), + SeqBin + end, + CProp = {update_seq, fabric2_util:to_hex(RawSeq)}, + + MProps = lists:flatmap(fun({K, V}) -> + case erlfdb_tuple:unpack(K, DbPrefix) of + {?DB_STATS, <<"doc_count">>} -> + [{doc_count, ?bin2uint(V)}]; + {?DB_STATS, <<"doc_del_count">>} -> + [{doc_del_count, ?bin2uint(V)}]; + {?DB_STATS, <<"size">>} -> + Val = ?bin2uint(V), + [ + {other, {[{data_size, Val}]}}, + {sizes, {[ + {active, 0}, + {external, Val}, + {file, 0} + ]}} + ]; + {?DB_STATS, _} -> + [] + end + end, erlfdb:wait(MetaFuture)), + + [CProp | MProps]. + + +get_config(#{} = Db) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + {Start, End} = erlfdb_tuple:pack({?DB_CONFIG}, DbPrefix), + Future = erlfdb:get_range(Tx, Start, End), + + lists:map(fun(K, V) -> + {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix), + {Key, V} + end, erlfdb:wait(Future)). + + +set_config(#{} = Db, ConfigKey, ConfigVal) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + Key = erlfdb_tuple:pack({?DB_CONFIG, ConfigKey}, DbPrefix), + erlfdb:set(Tx, Key, ConfigVal). + + +get_stat(#{} = Db, StatKey) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix), + + % Might need to figure out some sort of type + % system here. Uints are because stats are all + % atomic op adds for the moment. + ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))), + bump_metadata_version(Tx). + + +incr_stat(_Db, _Statey, 0) -> + ok; + +incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix), + erlfdb:add(Tx, Key, Increment). + + +get_full_doc_info(#{} = Db, DocId) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + Key = erlfdb_tuple:pack({?DB_DOCS, DocId}, DbPrefix), + Val = erlfdb:wait(erlfdb:get(Tx, Key)), + fdb_to_fdi(Db, DocId, Val). + + +get_doc_body(#{} = Db, DocId, {Pos, [Rev | _]} = Path) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + Key = erlfdb_tupe:pack({?DB_REVS, DocId, Pos, Rev}, DbPrefix), + Val = erlfdb:wait(erlfdb:get(Tx, Key)), + fdb_to_doc(Db, DocId, Pos, Path, Val). + + +store_doc(#{} = Db, #full_doc_info{} = FDI, #doc{} = Doc) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + #full_doc_info{ + id = DocId, + update_seq = OldUpdateSeq + } = FDI, + + #doc{ + revs = {RevStart, [Rev | _]} + } = Doc, + + % Delete old entry in changes feed + OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldUpdateSeq}, DbPrefix), + erlfdb:clear(Tx, OldSeqKey), + + % Add new entry to changes feed + NewSeqKey = erlfdb_tuple:pack({?DB_CHANGES, ?UNSET_VS}, DbPrefix), + erlfdb:set_versionstamped_key(Tx, NewSeqKey, DocId), + + % Write document data + {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc), + erlfdb:set(Tx, NewDocKey, NewDocVal), + + % Update revision tree entry + {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI), + erlfdb:set_versionstamped_value(Tx, NewFDIKey, NewFDIVal). + + +get_changes(#{} = Db, Options) -> + ?REQUIRE_CURRENT(Db), + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + {CStart, CEnd} = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix), + Future = erlfdb:get_range(Tx, CStart, CEnd, Options), + lists:map(fun({Key, Val}) -> + {?DB_CHANGES, Seq} = erlfdb_tuple:unpack(Key, DbPrefix), + {fabric2_util:to_hex(Seq), Val} + end, erlfdb:wait(Future)). + + + + +bump_metadata_version(Tx) -> + % The 14 zero bytes is pulled from the PR for adding the + % metadata version key. Not sure why 14 bytes when version + % stamps are only 80, but whatever for now. + erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>). + + +doc_to_fdb(Db, #doc{} = Doc) -> + #{ + db_prefix := DbPrefix + } = Db, + + #doc{ + id = Id, + revs = {Start, [Rev | _]}, + body = Body, + atts = Atts, + deleted = Deleted + } = Doc, + + Key = erlfdb_tuple:pack({?DB_REVS, Id, Start, Rev}, DbPrefix), + Val = {Body, Atts, Deleted}, + {Key, term_to_binary(Val, [{minor_version, 1}])}. + + +fdb_to_doc(_Db, DocId, Pos, Path, Bin) when is_binary(Bin) -> + {Body, Atts, Deleted} = binary_to_term(Bin, [safe]), + #doc{ + id = DocId, + revs = {Pos, Path}, + body = Body, + atts = Atts, + deleted = Deleted + }; +fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) -> + {not_found, missing}. + + +fdi_to_fdb(Db, #full_doc_info{} = FDI) -> + #{ + db_prefix := DbPrefix + } = Db, + + #full_doc_info{ + id = Id, + deleted = Deleted, + rev_tree = RevTree + } = flush_tree(FDI), + + Key = erlfdb_tuple:pack({?DB_DOCS, Id}, DbPrefix), + RevTreeBin = term_to_binary(RevTree, [{minor_version, 1}]), + ValTuple = { + Deleted, + RevTreeBin, + {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF} + }, + Val = erlfdb_tuple:pack_vs(ValTuple), + {Key, Val}. + + +flush_tree(FDI) -> + #full_doc_info{ + rev_tree = Unflushed + } = FDI, + + Flushed = couch_key_tree:map(fun(_Rev, Value) -> + case Value of + #doc{deleted = Del} -> #leaf{deleted = Del}; + _ -> Value + end + end, Unflushed), + + FDI#full_doc_info{ + rev_tree = Flushed + }. + + +fdb_to_fdi(_Db, Id, Bin) when is_binary(Bin) -> + {Deleted, RevTreeBin, {versionstamp, V, B}} = erlfdb_tuple:unpack(Bin), + RevTree = binary_to_term(RevTreeBin, [safe]), + UpdateSeq = <<V:64/big, B:16/big>>, + #full_doc_info{ + id = Id, + deleted = Deleted, + rev_tree = RevTree, + update_seq = UpdateSeq + }; +fdb_to_fdi(_Db, _Id, not_found) -> + not_found.
\ No newline at end of file diff --git a/src/fabric/src/fabric_security.erl b/src/fabric/src/fabric2_security.erl index b3f13886d..b3f13886d 100644 --- a/src/fabric/src/fabric_security.erl +++ b/src/fabric/src/fabric2_security.erl diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl new file mode 100644 index 000000000..58223315a --- /dev/null +++ b/src/fabric/src/fabric2_server.erl @@ -0,0 +1,91 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_server). +-behaviour(gen_server). +-vsn(1). + + +-export([ + start_link/0, + fetch/1, + store/1 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(CLUSTER_FILE, "/usr/local/etc/foundationdb/fdb.cluster"). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +fetch(DbName) when is_binary(DbName) -> + case ets:lookup(?MODULE, DbName) of + [{DbName, #{} = Db] -> Db; + [] -> undefined + end. + + +store(#{name := DbName} = Db0) when is_binary(DbName) -> + Db1 = Db0#{ + tx => undefined, + user_ctx => #user_ctx{} + }, + true = ets:insert(?MODULE, {DbName, Db1}}), + Db1. + + +init(_) -> + ets:new(?MODULE, [ + public, + named_table, + {read_concurrency, true}, + {write_concurrency, true} + ]), + + ClusterStr = config:get("erlfdb", "cluster_file", ?CLUSTER_FILE), + Db = erlfdb:open(iolist_to_binary(ClusterStr)), + application:set_env(fabric, db, Db), + init_cluster(Db), + + {ok, nil}. + + +terminate(_, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. diff --git a/src/fabric/src/fabric_sup.erl b/src/fabric/src/fabric2_sup.erl index 54636638c..00eea3ed7 100644 --- a/src/fabric/src/fabric_sup.erl +++ b/src/fabric/src/fabric2_sup.erl @@ -10,7 +10,7 @@ % License for the specific language governing permissions and limitations under % the License. --module(fabric_sup). +-module(fabric2_sup). -behaviour(supervisor). -vsn(1). @@ -36,8 +36,8 @@ init([]) -> }, Children = [ #{ - id => fabric_server, - start => {fabric_server, start_link, []} + id => fabric2_server, + start => {fabric2_server, start_link, []} } ], {ok, {Flags, Children}}. diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl new file mode 100644 index 000000000..1c47222f6 --- /dev/null +++ b/src/fabric/src/fabric2_util.erl @@ -0,0 +1,109 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_util). + + +-export([ + transactional/2, + transactional/3, + + get_db_handle/0, + + user_ctx_to_json/1, + + uuid/0, + + to_hex/1, + + debug_cluster/0, + debug_cluster/2 +]). + + +-define(PDICT_DB_KEY, '$erlfdb_handle'). + + +trasactional(Fun) when is_function(Fun, 1) -> + Db = get_db_handle(), + erlfdb:transactional(Db, Fun). + + +get_db_handle() -> + case get(?PDICT_DB_KEY) of + undefined -> + {ok, Db} = application:get_env(fabric, db), + put(?PDICT_DB_KEY, Db), + Db; + Db -> + Db + end. + + +user_ctx_to_json(Db) -> + UserCtx = fabric2_db:get_user_ctx(Db), + {[ + {<<"db">>, fabric2_db:name(Db)}, + {<<"name">>, Ctx#user_ctx.name}, + {<<"roles">>, Ctx#user_ctx.roles} + ]}. + + + +uuid() -> + to_hex(crypto:strong_rand_bytes(16)). + + +to_hex(Bin) -> + list_to_binary(to_hex_int(Bin)). + + +to_hex_int(<<>>) -> + []; +to_hex_int(<<Hi:4, Lo:4, Rest/binary>>) -> + [nibble_to_hex(Hi), nibble_to_hex(Lo) | to_hex(Rest)]; + + +nibble_to_hex(I) -> + case I of + 0 -> $0; + 1 -> $1; + 2 -> $2; + 3 -> $3; + 4 -> $4; + 5 -> $5; + 6 -> $6; + 7 -> $7; + 8 -> $8; + 9 -> $9; + 10 -> $a; + 11 -> $b; + 12 -> $c; + 13 -> $d; + 14 -> $e; + 15 -> $f + end. + + +debug_cluster() -> + debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). + + +debug_cluster(Start, End) -> + transactional(fun(Tx) -> + lists:foreach(fun({Key, Val}) -> + io:format("~s => ~s~n", [ + string:pad(erlfdb_util:repr(Key), 60), + erlfdb_util:repr(Val) + ]) + end, erlfdb:get_range(Tx, Start, End)) + end).
\ No newline at end of file diff --git a/src/fabric/src/fabric_server.erl b/src/fabric/src/fabric_server.erl deleted file mode 100644 index 45e091f48..000000000 --- a/src/fabric/src/fabric_server.erl +++ /dev/null @@ -1,139 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_server). --behaviour(gen_server). --vsn(1). - - --export([ - start_link/0, - transactional/1, - get_dir/1, - - debug_cluster/0, - debug_cluster/2 -]). - - --export([ - init/1, - terminate/2, - handle_call/3, - handle_cast/2, - handle_info/2, - code_change/3 -]). - - --record(st, { - db -}). - - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - - -transactional(Fun) when is_function(Fun, 1) -> - [{'$handle$', Db}] = ets:lookup(?MODULE, '$handle$'), - erlfdb:transactional(Db, Fun). - - -get_dir(Name) -> - case ets:lookup(?MODULE, Name) of - [{Name, Dir}] -> Dir; - [] -> not_found - end. - - -debug_cluster() -> - debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). - - -debug_cluster(Start, End) -> - transactional(fun(Tx) -> - lists:foreach(fun({Key, Val}) -> - io:format("~s => ~s~n", [ - string:pad(erlfdb_util:repr(Key), 60), - erlfdb_util:repr(Val) - ]) - end, erlfdb:get_range(Tx, Start, End)) - end). - - -init(_) -> - ets:new(?MODULE, [ - protected, - named_table, - {read_concurrency, true} - ]), - - ClusterStr = config:get("erlfdb", "cluster_file", "/usr/local/etc/foundationdb/fdb.cluster"), - Db = erlfdb:open(iolist_to_binary(ClusterStr)), - Dirs = init_cluster(Db), - - ets:insert(?MODULE, {'$handle$', Db}), - lists:foreach(fun({K, V}) -> - ets:insert(?MODULE, {K, V}) - end, Dirs), - - {ok, #st{db = Db}}. - - -terminate(_, _St) -> - ok. - - -handle_call(Msg, _From, St) -> - {stop, {bad_call, Msg}, {bad_call, Msg}, St}. - - -handle_cast(Msg, St) -> - {stop, {bad_cast, Msg}, St}. - - -handle_info(Msg, St) -> - {stop, {bad_info, Msg}, St}. - - -code_change(_OldVsn, St, _Extra) -> - {ok, St}. - - - -init_cluster(Db) -> - Dirs = erlfdb:transactional(Db, fun(Tx) -> - Root = erlfdb_directory:root(), - CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), - Dbs = erlfdb_directory:create_or_open(Tx, CouchDB, [<<"dbs">>]), - Config = erlfdb_directory:create_or_open(Tx, CouchDB, [ - <<"meta">>, - <<"config">> - ]), - [ - {root, Root}, - {config, Config}, - {dbs, Dbs} - ] - end), - drain_ready(Dirs). - - -drain_ready(Dirs) -> - receive - {Ref, ready} when is_reference(Ref) -> - drain_ready(Dirs) - after 100 -> - Dirs - end.
\ No newline at end of file |