summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-03-26 15:20:47 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-03-26 15:20:47 -0500
commitf6e64ec5fceb4b4f2abd60ae5c4e9a8451eddd85 (patch)
treef08316dcdff3dfda2e1194215f47d8dec39aaf0c
parent6221795b5494e3bb08bc7afd4f10b0d78270bed8 (diff)
downloadcouchdb-f6e64ec5fceb4b4f2abd60ae5c4e9a8451eddd85.tar.gz
WIP: Refactoring fdb layer code
-rw-r--r--src/fabric/include/fabric2.hrl21
-rw-r--r--src/fabric/src/fabric.app.src2
-rw-r--r--src/fabric/src/fabric2.erl219
-rw-r--r--src/fabric/src/fabric2_app.erl (renamed from src/fabric/src/fabric_app.erl)4
-rw-r--r--src/fabric/src/fabric2_db.erl808
-rw-r--r--src/fabric/src/fabric2_doc.erl494
-rw-r--r--src/fabric/src/fabric2_fdb.erl490
-rw-r--r--src/fabric/src/fabric2_security.erl (renamed from src/fabric/src/fabric_security.erl)0
-rw-r--r--src/fabric/src/fabric2_server.erl91
-rw-r--r--src/fabric/src/fabric2_sup.erl (renamed from src/fabric/src/fabric_sup.erl)6
-rw-r--r--src/fabric/src/fabric2_util.erl109
-rw-r--r--src/fabric/src/fabric_server.erl139
12 files changed, 1403 insertions, 980 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
new file mode 100644
index 000000000..e56f84fdc
--- /dev/null
+++ b/src/fabric/include/fabric2.hrl
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-define(SYSTEM_DATABASES, [
+ <<"_dbs">>,
+ <<"_global_changes">>,
+ <<"_metadata">>,
+ <<"_nodes">>,
+ <<"_replicator">>,
+ <<"_users">>
+]).
diff --git a/src/fabric/src/fabric.app.src b/src/fabric/src/fabric.app.src
index f59108290..20fbb1e2a 100644
--- a/src/fabric/src/fabric.app.src
+++ b/src/fabric/src/fabric.app.src
@@ -13,7 +13,7 @@
{application, fabric, [
{description, "Routing and proxying layer for CouchDB cluster"},
{vsn, git},
- {mod, {fabric_app, []}},
+ {mod, {fabric2_app, []}},
{registered, [
fabric_server
]},
diff --git a/src/fabric/src/fabric2.erl b/src/fabric/src/fabric2.erl
index a0d67d8a0..6acf23f48 100644
--- a/src/fabric/src/fabric2.erl
+++ b/src/fabric/src/fabric2.erl
@@ -13,231 +13,12 @@
-module(fabric2).
--export([
- %% % Databases
- %% all_dbs/0,
- %% all_dbs/1,
- create_db/1,
- create_db/2,
- delete_db/1,
- delete_db/2,
- open_db/1,
- open_db/2,
- get_db_info/1,
- get_doc_count/1,
- get_doc_count/2,
- %% get_revs_limit/1,
- %% set_revs_limit/3,
- get_security/1,
- set_security/2,
-
- %% get_purge_infos_limit/1,
- %% set_purge_infos_limit/3,
- %%
- %% compact/1,
- %% compact/2,
- %%
- %% get_partition_info/2,
- %%
- % Documents
- open_doc/3,
- open_revs/4,
- %%
- %% get_doc_info/3,
- %% get_full_doc_info/3,
- %%
- %% get_missing_revs/2,
- %% get_missing_revs/3,
-
- update_doc/3,
- update_docs/3
-
- %% purge_docs/3,
- %%
- %% att_receiver/2,
- %%
- %% % Views
- %% all_docs/4,
- %% all_docs/5,
- %%
- %% changes/4,
- %% end_changes/0,
- %%
- %% query_view/3,
- %% query_view/4,
- %% query_view/6,
- %% query_view/7,
- %% get_view_group_info/2,
- %%
- %% % Miscellany
- %% design_docs/1,
- %%
- %% reset_validation_funs/1,
- %%
- %% cleanup_index_files/0,
- %% cleanup_index_files/1,
- %% cleanup_index_files_all_nodes/1,
- %%
- %% dbname/1
-]).
-
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-
--define(DEFAULT_SECURITY, <<"{}">>).
-
-
-create_db(DbName) ->
- create_db(DbName, []).
-
-
-create_db(DbName, _Options) ->
- DbsDir = fabric_server:get_dir(dbs),
- fabric_server:transactional(fun(Tx) ->
- try
- DbDir = erlfdb_directory:create(Tx, DbsDir, DbName),
- fabric2_db:init(DbName, Tx, DbDir)
- catch error:{erlfdb_directory, {create_error, path_exists, _}} ->
- {error, file_exists}
- end
- end).
-
-
-delete_db(DbName) ->
- delete_db(DbName, []).
-
-
-delete_db(DbName, _Options) ->
- try
- DbsDir = fabric_server:get_dir(dbs),
- fabric_server:transactional(fun(Tx) ->
- erlfdb_directory:remove(Tx, DbsDir, DbName)
- end)
- catch error:{erlfdb_directory, {remove_error, path_missing, _}} ->
- erlang:error(database_does_not_exist)
- end.
-
-
-open_db(DbName) ->
- open_db(DbName, []).
-
-
-open_db(DbName, Options) ->
- fabric2_db:open(DbName, Options).
-
-
-get_db_info(DbName) when is_binary(DbName) ->
- get_db_info(open_db(DbName));
-
-get_db_info(Db) ->
- DbProps = fabric2_db:with_tx(Db, fun(TxDb) ->
- {CStart, CEnd} = fabric2_db:range_bounds(TxDb, {<<"changes">>}),
- ChangesFuture = fabric2_db:get_range(TxDb, {CStart}, {CEnd}, [
- {streaming_mode, exact},
- {limit, 1},
- {reverse, true}
- ]),
-
- StatsPrefix = {<<"meta">>, <<"stats">>},
- MetaFuture = fabric2_db:get_range_startswith(TxDb, StatsPrefix),
-
- RawSeq = case erlfdb:wait(ChangesFuture) of
- [] ->
- <<0:80>>;
- [{SeqKey, _}] ->
- {<<"changes">>, SeqBin} = fabric2_db:unpack(TxDb, SeqKey),
- SeqBin
- end,
- CProp = {update_seq, ?l2b(couch_util:to_hex(RawSeq))},
-
- MProps = lists:flatmap(fun({K, V}) ->
- case fabric2_db:unpack(TxDb, K) of
- {_, _, <<"doc_count">>} ->
- [{doc_count, ?bin2uint(V)}];
- {_, _, <<"doc_del_count">>} ->
- [{doc_del_count, ?bin2uint(V)}];
- {_, _, <<"size">>} ->
- Val = ?bin2uint(V),
- [
- {other, {[{data_size, Val}]}},
- {sizes, {[
- {active, 0},
- {external, Val},
- {file, 0}
- ]}}
- ];
- _ ->
- []
- end
- end, erlfdb:wait(MetaFuture)),
-
- [CProp | MProps]
- end),
-
- BaseProps = [
- {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
- {compact_running, false},
- {data_size, 0},
- {db_name, fabric2_db:name(Db)},
- {disk_format_version, 0},
- {disk_size, 0},
- {instance_start_time, <<"0">>},
- {purge_seq, 0}
- ],
-
- {ok, lists:foldl(fun({Key, Val}, Acc) ->
- lists:keystore(Key, 1, Acc, {Key, Val})
- end, BaseProps, DbProps)}.
-
-
-get_doc_count(DbName) ->
- get_doc_count(DbName, <<"_all_docs">>).
-
-
-get_doc_count(DbName, <<"_all_docs">>) ->
- get_doc_count(DbName, <<"doc_count">>);
-
-get_doc_count(DbName, <<"_design">>) ->
- get_doc_count(DbName, <<"doc_design_count">>);
-
-get_doc_count(DbName, <<"_local">>) ->
- get_doc_count(DbName, <<"doc_local_count">>);
-
-get_doc_count(Db, Key) ->
- fabric2_db:with_tx(Db, fun(TxDb) ->
- Future = fabric2_db:get(TxDb, {<<"meta">>, <<"stats">>, Key}),
- ?bin2uint(erlfdb:wait(Future))
- end).
-
-
-get_security(DbName) when is_binary(DbName) ->
- get_security(open_db(DbName));
-
-get_security(Db) ->
- Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
- SecJson = fabric2_db:with_tx(Db, fun(TxDb) ->
- erlfdb:wait(fabric2_db:get(TxDb, Key))
- end),
- ?JSON_DECODE(SecJson).
-
-
-set_security(DbName, ErlJson) when is_binary(DbName) ->
- set_security(open_db(DbName), ErlJson);
-
-set_security(Db, ErlJson) ->
- Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
- SecJson = ?JSON_ENCODE(ErlJson),
- fabric2_db:with_tx(Db, fun(TxDb) ->
- fabric2_db:set(TxDb, Key, SecJson)
- end).
open_doc(Db, DocId, Options) ->
diff --git a/src/fabric/src/fabric_app.erl b/src/fabric/src/fabric2_app.erl
index 77d9886a7..9becf8bfa 100644
--- a/src/fabric/src/fabric_app.erl
+++ b/src/fabric/src/fabric2_app.erl
@@ -10,7 +10,7 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(fabric_app).
+-module(fabric2_app).
-behaviour(application).
@@ -21,7 +21,7 @@
start(_Type, StartArgs) ->
- fabric_sup:start_link(StartArgs).
+ fabric2_sup:start_link(StartArgs).
stop(_State) ->
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 7bb7f0838..5df51dde7 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -14,175 +14,739 @@
-export([
+ create/2,
open/2,
- init/3,
+ delete/2,
name/1,
- get_vdus/1,
+ get_after_doc_read_fun/1,
+ get_before_doc_update_fun/1,
+ get_committed_update_seq/1,
+ get_compacted_seq/1,
+ get_compactor_pid/1,
+ get_db_info/1,
+ %% get_partition_info/2,
+ get_del_doc_count/1,
+ get_doc_count/1,
+ get_doc_count/2,
+ %% get_epochs/1,
+ %% get_filepath/1,
+ get_instance_start_time/1,
+ get_pid/1,
+ get_revs_limit/1,
+ get_security/1,
+ get_update_seq/1,
+ get_user_ctx/1,
+ get_uuid/1,
+ %% get_purge_seq/1,
+ %% get_oldest_purge_seq/1,
+ %% get_purge_infos_limit/1,
+
+ is_clustered/1,
+ is_db/1,
+ is_partitioned/1,
+ is_system_db/1,
+ is_system_db_name/1,
+
+ set_revs_limit/2,
+ %% set_purge_infos_limit/2,
+ set_security/2,
+ set_user_ctx/2,
+
+ ensure_full_commit/1,
+ ensure_full_commit/2,
+
+ %% load_validation_funs/1,
+ %% reload_validation_funs/1,
+
+ open_doc/2,
+ open_doc/3,
+ open_doc_revs/4,
+ %% open_doc_int/3,
+ %% get_doc_info/2,
+ %% get_full_doc_info/2,
+ %% get_full_doc_infos/2,
+ %% get_missing_revs/2,
+ %% get_design_doc/2,
+ %% get_design_docs/1,
+ %% get_design_doc_count/1,
+ %% get_purge_infos/2,
+
+ %% get_minimum_purge_seq/1,
+ %% purge_client_exists/3,
+
+ %% validate_docid/2,
+ %% doc_from_json_obj_validate/2,
+
+ update_doc/2,
+ update_doc/3,
+ update_docs/2,
+ update_docs/3,
+ %% delete_doc/3,
+
+ %% purge_docs/2,
+ %% purge_docs/3,
+
+ %% with_stream/3,
+ %% open_write_stream/2,
+ %% open_read_stream/2,
+ %% is_active_stream/2,
+
+ %% fold_docs/3,
+ %% fold_docs/4,
+ %% fold_local_docs/4,
+ %% fold_design_docs/4,
+ %% fold_changes/4,
+ %% fold_changes/5,
+ %% count_changes_since/2,
+ %% fold_purge_infos/4,
+ %% fold_purge_infos/5,
+
+ %% calculate_start_seq/3,
+ %% owner_of/2,
+
+ %% start_compact/1,
+ %% cancel_compact/1,
+ %% wait_for_compaction/1,
+ %% wait_for_compaction/2,
+
+ %% dbname_suffix/1,
+ %% normalize_dbname/1,
+ %% validate_dbname/1,
+
+ %% make_doc/5,
+ new_revid/1
+]).
- with_tx/2,
- pack/2,
- pack_vs/2,
- unpack/2,
- range_bounds/2,
+-include_lib("fabric/include/fabric2.hrl").
- get/2,
- get_range/3,
- get_range/4,
- get_range_startswith/2,
- get_range_startswith/3
-]).
+-define(DBNAME_REGEX,
+ "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex
+ "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
+).
--include_lib("couch/include/couch_db.hrl").
--include_lib("fabric/include/fabric.hrl").
+-define(RETURN(Term), throw({?MODULE, Term})).
-open(DbName, Options) when is_binary(DbName), is_list(Options) ->
- BaseDb = #{
- vsn => 1,
- name => DbName,
- tx => undefined,
- dir => undefined,
+create(DbName, Options) ->
+ Result = fabric2_util:transactional(DbName, Options, fun(TxDb) ->
+ case fabric2_fdb:db_exists(TxDb) of
+ true ->
+ {error, file_exists};
+ false ->
+ fabric2_fdb:db_create(TxDb)
+ end,
+ end),
+ % We cache outside of the transaction so that we're sure
+ % that this request created the database
+ case Result of
+ #{} = Db ->
+ fabric2_server:store(Db);
+ Error ->
+ Error
+ end.
+
+
+open(DbName, Options) ->
+ case fabric2_server:fetch(DbName) of
+ #{} = Db ->
+ fabric2_util:transactional(Db, fun(TxDb) ->
+ case fabric2_fdb:db_is_current(TxDb) of
+ true ->
+ Db;
+ false ->
+ Reopend = fabric2_fdb:db_open(TxDb),
+ fabric2_server:store(Reopened)
+ end
+ end);
+ undefined ->
+ fabric2_util:transactional(DbName, Options, fun(TxDb) ->
+ Opened = fabric2_fdb:db_open(TxDb),
+ fabric2_server:store(Opened)
+ end)
+ end.
+
+
+delete(DbName, Options) ->
+ % This will throw if the db does not exist
+ Db = open(DbName, Options),
+ with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:db_delete(TxDb)
+ end).
- instance_start_time => 0,
- user_ctx => #user_ctx{},
- security => [],
- validate_doc_update => [],
- before_doc_update => nil,
- after_doc_read => nil,
- options => []
- },
- lists:foldl(fun({K, V}, DbAcc) ->
- maps:put(K, V, DbAcc)
- end, BaseDb, Options).
+name(#{name := DbName}) ->
+ DbName.
+
+
+get_after_doc_read_fun(#{after_doc_read := AfterDocRead}) ->
+ AfterDocRead.
+
+
+get_before_doc_update_fun(#{before_doc_update := BeforeDocUpdate}) ->
+ BeforeDocUpdate.
+get_commited_update_seq(#{} = Db) ->
+ get_update_seq(Db).
-name(#{name := Name}) ->
- Name.
+get_compacted_seq(#{} = Db) ->
+ get_update_seq(Db).
-get_vdus(#{validate_doc_update := VDUs}) ->
- VDUs.
+get_compactor_pid(#{} = Db) ->
+ nil.
-init(DbName, Tx, DbDir) ->
- TxDb = open(DbName, [{tx, Tx}, {dir, DbDir}]),
- Defaults = [
- {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
- {{<<"meta">>, <<"config">>, <<"security_doc">>}, <<"{}">>},
- {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
+
+get_db_info(#{} = Db) ->
+ DbProps = with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:db_get_info(TxDb)
+ end),
+
+ BaseProps = [
+ {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
+ {compact_running, false},
+ {data_size, 0},
+ {db_name, DbName},
+ {disk_format_version, 0},
+ {disk_size, 0},
+ {instance_start_time, <<"0">>},
+ {purge_seq, 0}
],
- lists:foreach(fun({K, V}) ->
- erlfdb:set(Tx, pack(TxDb, K), V)
- end, Defaults).
-
-
-with_tx(Db, Fun) when is_function(Fun, 1) ->
- DbName = maps:get(name, Db),
- DbsDir = fabric_server:get_dir(dbs),
- fabric_server:transactional(fun(Tx) ->
- % We'll eventually want to cache this in the
- % fabric_server ets table.
- DbDir = try
- erlfdb_directory:open(Tx, DbsDir, DbName)
- catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
- erlang:error(database_does_not_exist)
- end,
- TxDb = Db#{tx := Tx, dir := DbDir},
- Fun(TxDb)
+
+ {ok, lists:foldl(fun({Key, Val}, Acc) ->
+ lists:keystore(Key, 1, Acc, {Key, Val})
+ end, BaseProps, DbProps)}.
+
+
+get_del_doc_count(#{} = Db) ->
+ get_doc_count(Db, <<"doc_del_count">>).
+
+
+get_doc_count(Db) ->
+ get_doc_count(Db, <<"doc_count">>).
+
+
+get_doc_count(Db, <<"_all_docs">>) ->
+ get_doc_count(Db, <<"doc_count">>);
+
+get_doc_count(DbName, <<"_design">>) ->
+ get_doc_count(DbName, <<"doc_design_count">>);
+
+get_doc_count(DbName, <<"_local">>) ->
+ get_doc_count(DbName, <<"doc_local_count">>);
+
+get_doc_count(Db, Key) ->
+ with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:get_stat(TxDb, Key)
end).
-pack(#{dir := undefined} = Db, _Tuple) ->
- erlang:error({no_directory, Db});
+get_instance_startime(#{}) ->
+ 0.
-pack(Db, Tuple) ->
- #{
- dir := Dir
- } = Db,
- erlfdb_directory:pack(Dir, Tuple).
+get_pid(#{}) ->
+ nil.
-pack_vs(#{dir := undefined} = Db, _Tuple) ->
- erlang:error({no_directory, Db});
-pack_vs(Db, Tuple) ->
- #{
- dir := Dir
- } = Db,
- erlfdb_directory:pack_vs(Dir, Tuple).
+get_revs_limit(#{revs_limit := RevsLimit}) ->
+ RevsLimit.
-unpack(#{dir := undefined} = Db, _Key) ->
- erlang:error({no_directory, Db});
+get_security(#{security_doc := SecurityDoc}) ->
+ SecurityDoc.
-unpack(Db, Key) ->
- #{
- dir := Dir
- } = Db,
- erlfdb_directory:unpack(Dir, Key).
+get_update_seq(#{} = Db) ->
+ case fabric2_fdb:db_get_changes(Db, [{limit, 1}, {reverse, true}]) of
+ [] ->
+ fabric2_util:to_hex(<<0:80>>);
+ [{Seq, _}] ->
+ Seq
+ end.
-range_bounds(#{dir := undefined} = Db, _Key) ->
- erlang:error({no_directory, Db});
-range_bounds(Db, Key) ->
- #{
- dir := Dir
- } = Db,
- erlfdb_directory:range(Dir, Key).
+get_uuid(#{uuid := UUID}) ->
+ UUID.
+is_clustered(#{}) ->
+ false.
-get(#{tx := undefined} = Db, _Key) ->
- erlang:error({invalid_tx_db, Db});
-get(Db, Key) ->
- #{
- tx := Tx,
- dir := Dir
- } = Db,
- BinKey = erlfdb_directory:pack(Dir, Key),
- erlfdb:get(Tx, BinKey).
+is_db(#{name := _}) ->
+ true;
+is_db(_) ->
+ false.
-get_range(Db, StartKey, EndKey) ->
- get_range(Db, StartKey, EndKey, []).
+is_partitioned(#{}) ->
+ false.
-get_range(#{tx := undefined} = Db, _, _, _) ->
- erlang:error({invalid_tx_db, Db});
+is_system_db(#{name := DbName}) ->
+ is_system_db_name(DbName).
-get_range(Db, StartKey, EndKey, Options) ->
- #{
- tx := Tx,
- dir := Dir
- } = Db,
- BinStartKey = erlfdb_directory:pack(Dir, StartKey),
- BinEndKey= erlfdb_directory:pack(Dir, EndKey),
- erlfdb:get_range(Tx, BinStartKey, BinEndKey, Options).
+is_system_db_name(DbName) when is_list(DbName) ->
+ is_system_db_name(?l2b(DbName));
+is_system_db_name(DbName) when is_binary(DbName) ->
+ Suffix = filename:basename(DbName),
+ case {filename:dirname(DbName), lists:member(Suffix, ?SYSTEM_DATABASES)} of
+ {<<".">>, Result} -> Result;
+ {_Prefix, false} -> false;
+ {Prefix, true} ->
+ ReOpts = [{capture,none}, dollar_endonly],
+ re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match
+ end.
-get_range_startswith(Db, Prefix) ->
- get_range_startswith(Db, Prefix, []).
+set_revs_limit(#{} = Db, RevsLimit) ->
+ RevsLimBin = ?uint2bin(RevsLimit),
+ with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:db_set_config(TxDb, <<"revs_limit">>, RevsLimBin)
+ end).
-get_range_startswith(#{tx := undefined} = Db, _, _) ->
- erlang:error({invalid_tx_db, Db});
-get_range_startswith(Db, Prefix, Options) ->
+set_security(#{} = Db, Security) ->
+ SecBin = ?JSON_ENCODE(Security),
+ with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:db_set_config(TxDb, <<"security_doc">>, SecBin)
+ end).
+
+
+set_user_ctx(#{} = Db, UserCtx) ->
+ Db#{user_ctx => UserCtx}.
+
+
+ensure_full_commit(#{}) ->
+ {ok, 0}.
+
+
+ensure_full_commit(#{}, _Timeout) ->
+ {ok, 0}.
+
+
+open_doc(#{} = Db, DocId) ->
+ open_doc(Db, DocId, []).
+
+
+open_doc(#{} = Db, DocId, Options) ->
+ with_tx(Db, fun(TxDb) ->
+ case fabric2_fdb:get_full_doc_info(TxDb, DocId) of
+ not_found ->
+ {not_found, missing};
+ #full_doc_info{} = FDI ->
+ {_, Path} = couch_doc:to_doc_info_path(FDI),
+ case fabric2_fdb:get_doc_body(TxDb, DocId, Path) of
+ #doc{} = Doc -> {ok, Doc};
+ Error -> Error
+ end
+ end
+ end).
+
+
+open_doc_revs(Db, FDI, Revs, Options) ->
+ #full_doc_info{
+ id = Id,
+ rev_tree = RevTree
+ } = FDI,
+ Latest = lists:member(latest, Options),
+ {Found, Missing} = case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ when Latest ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ _ ->
+ couch_key_tree:get(RevTree, Revs)
+ end,
+ Docs = with_tx(Db, fun(TxDb) ->
+ lists:map(fun({Value, {Pos, [Rev | _]} = RevPath}) ->
+ case Value of
+ ?REV_MISSING ->
+ % We have the rev in our list but know nothing about it
+ {{not_found, missing}, {Pos, Rev}};
+ _ ->
+ case fabric2_fdb:get_doc_body(Db, Id, RevPath) of
+ #doc{} = Doc -> {ok, Doc};
+ Else -> {Else, {Pos, Rev}}
+ end
+ end
+ end, Found)
+ end),
+ MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing],
+ {ok, Docs ++ MissingDocs}.
+
+
+update_doc(Db, Doc) ->
+ update_doc(Db, Doc, []).
+
+
+update_doc(Db, Doc, Options) ->
+ with_tx(Db, fun(TxDb) ->
+ update_doc_int(TxDb, Doc, Options)
+ end).
+
+
+update_docs(Db, Docs) ->
+ update_docs(Db, Docs, []).
+
+
+update_docs(Db, Docs, Options) ->
+ with_tx(Db, fun(TxDb) ->
+ {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) ->
+ case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+ {ok, _} = Resp ->
+ {Resp, Acc};
+ {error, _} = Resp ->
+ {Resp, error}
+ end
+ end, ok, Docs),
+ {Status, Resps}
+ end).
+
+
+new_revid(Doc) ->
+ #doc{
+ body = Body,
+ revs = {OldStart, OldRevs},
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+
+ DigestedAtts = lists:foldl(fun(Att, Acc) ->
+ [N, T, M] = couch_att:fetch([name, type, md5], Att),
+ case M == <<>> of
+ true -> Acc;
+ false -> [{N, T, M} | Acc]
+ end
+ end, [], Atts),
+
+ Rev = case DigestedAtts of
+ Atts2 when length(Atts) =/= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ list_to_binary(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
+ SigTerm = [Deleted, OldStart, OldRev, Body, Atts2],
+ couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]))
+ end,
+
+ Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}.
+
+
+% TODO: Handle _local docs separately.
+update_doc_int(#{} = Db, #doc{} = Doc0, Options) ->
+ UpdateType = case lists:member(replicated_changes, Options) of
+ true -> replicated_changes;
+ false -> interactive_edit
+ end,
+
+ try
+ FDI1 = fabric2_fdb:get_full_doc_info(Db, Doc0#doc.id),
+ Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType),
+ Doc2 = case UpdateType of
+ interactive_edit -> new_revid(Doc1);
+ replicated_changes -> Doc1
+ end,
+ FDI2 = if FDI1 /= not_found -> FDI1; true ->
+ #full_doc_info{id = Doc2#doc.id}
+ end,
+ {FDI3, Doc3} = merge_rev_tree(FDI2, Doc2, UpdateType),
+
+ OldExists = case FDI1 of
+ not_found -> false;
+ #full_doc_info{deleted = true} -> false;
+ _ -> true
+ end,
+ NewExists = not FDI3#full_doc_info.deleted,
+
+ ok = fabric2_fdb:write_doc(Db, FDI3, Doc3)
+
+ case {OldExists, NewExists} of
+ {false, true} ->
+ fabric2_fdb:incr_stat(Db, <<"doc_count">>, 1);
+ {true, false} ->
+ fabric2_fdb:incr_stat(Db, <<"doc_count">>, -1),
+ fabric2_fdb:incr_stat(Db, <<"doc_del_count">>, 1);
+ {Exists, Exists} ->
+ % No change
+ ok
+ end,
+
+ % Need to count design documents
+ % Need to track db size changes
+
+ {ok, {RevStart, Rev}}
+ catch throw:{?MODULE, Return} ->
+ Return
+ end.
+
+prep_and_validate(Db, not_found, Doc, UpdateType) ->
+ case Doc#doc.revs of
+ {0, []} ->
+ ok;
+ _ when UpdateType == replicated_changes ->
+ ok;
+ _ ->
+ ?RETURN({error, conflict})
+ end,
+ prep_and_validate(Db, Doc, fun() -> nil end);
+
+prep_and_validate(Db, FDI, Doc, interactive_edit) ->
+ #doc{
+ revs = {Start, Revs}
+ } = Doc,
+
+ Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree),
+ LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) ->
+ {{LeafStart, LeafRev}, Path}
+ end, Leafs),
+
+ GetDocFun = case Revs of
+ [PrevRev | _] ->
+ case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of
+ {{Start, PrevRev}, Path} ->
+ fun() ->
+ fabric2_fdb:get_doc_body(Db, Doc#doc.id, {Start, Path})
+ end;
+ false ->
+ ?RETURN({error, conflict})
+ end;
+ [] ->
+ case FDI#full_doc_info.deleted of
+ true ->
+ fun() -> nil end;
+ false ->
+ ?RETURN({error, conflict})
+ end
+ end,
+ prep_and_validate(TxDb, Doc, GetDocFun);
+
+prep_and_validate(TxDb, FDI, Doc, replicated_changes) ->
+ #full_doc_info{
+ rev_tree = RevTree
+ } = FDI,
+ OldLeafs = couch_key_tree:get_all_leafs_full(RevTree),
+ OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs],
+
+ NewPath = couch_doc:to_path(Doc),
+ NewRevTree = couch_key_tree:merge(RevTree, NewPath),
+
+ Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+ LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) ->
+ [{{Start, RevId}, FullPath}]
+ end, Leafs),
+ LeafRevsFullDict = dict:from_list(LeafRevsFull),
+
+ #doc{
+ revs = {DocStart, [DocRev | _]}
+ } = Doc,
+ DocRevId = {DocStart, DocRev},
+
+ IsOldLeaf = lists:member(DocRevId, OldLeafsLU),
+ GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of
+ {ok, {DocStart, RevPath}} when not IsOldLeaf ->
+ % An incoming replicated edit only sends us leaf
+ % nodes which may have included multiple updates
+ % we haven't seen locally. Thus we have to search
+ % back through the tree to find the first edit
+ % we do know about.
+ case find_prev_known_rev(DocStart, RevPath) of
+ not_found -> fun() -> nil end;
+ PrevRevs -> fun() ->
+ fabric2_fdb:get_doc_body(Db, Doc#doc.id, PrevRevs)
+ end
+ end;
+ _ ->
+ % The update merged to an internal node that we
+ % already know about which means we're done with
+ % this update.
+ ?RETURN({ok, []})
+ end,
+
+ prep_and_validate(TxDb, Doc, GetDocFun).
+
+
+prep_and_validate(Db, Doc, GetDocBody) ->
+ NewDoc = case couch_doc:has_stubs(Doc) of
+ true ->
+ case GetDocBody() of
+ #doc{} = PrevDoc ->
+ couch_doc:merge_stubs(Doc, PrevDoc);
+ _ ->
+ % Force a missing stubs error
+ couch_doc:mege_stubs(Doc, #doc{})
+ end;
+ false ->
+ Doc
+ end,
+ validate_doc_update(Db, NewDoc, GetDocBody),
+ NewDoc.
+
+
+merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted ->
+ % We're recreating a document that was previously
+ % deleted. To check that this is a recreation from
+ % the root we assert that the new document has a
+ % revision depth of 1 (this is to avoid recreating a
+ % doc from a previous internal revision) and is also
+ % not deleted. To avoid expanding the revision tree
+ % unnecessarily we create a new revision based on
+ % the winning deleted revision.
+
+ {RevDepth, _} = Doc#doc.revs,
+ case RevDepth == 1 andalso not Doc#doc.deleted of
+ true ->
+ % Update the new doc based on revisions in OldInfo
+ #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI),
+ #rev_info{rev={OldPos, OldRev}} = WinningRev,
+ Body = case couch_util:get_value(comp_body, Doc#doc.meta) of
+ CompBody when is_binary(CompBody) ->
+ couch_compress:decompress(CompBody);
+ _ ->
+ Doc#doc.body
+ end,
+ NewDoc = new_revid(Doc#doc{
+ revs = {OldPos, [OldRev]},
+ body = Body
+ }),
+
+ % Merge our modified new doc into the tree
+ #full_doc_info{rev_tree = RevTree} = FDI,
+ case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of
+ {NewRevTree, new_leaf} ->
+ % We changed the revision id so inform the caller
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = false
+ },
+ {NewFDI, NewDoc};
+ _ ->
+ throw(doc_recreation_failed)
+ end;
+ _ ->
+ ?RETURN({error, conflict})
+ end;
+merge_rev_tree(FDI, Doc, interactive_edit) ->
+ % We're attempting to merge a new revision into an
+ % undeleted document. To not be a conflict we require
+ % that the merge results in extending a branch.
+
+ RevTree = FDI#full_doc_info.rev_tree,
+ case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of
+ {NewRevTree, new_leaf} when not Doc#doc.deleted ->
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = false
+ },
+ {NewFDI, Doc};
+ {NewRevTree, new_leaf} when Doc#doc.deleted ->
+ % We have to check if we just deleted this
+ % document completely or if it was a conflict
+ % resolution.
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = couch_doc:is_deleted(NewRevTree)
+ },
+ {NewFDI, Doc};
+ _ ->
+ ?RETURN({error, conflict})
+ end;
+merge_rev_tree(FDI, Doc, replicated_changes) ->
+ % We're merging in revisions without caring about
+ % conflicts. Most likely this is a replication update.
+ RevTree = FDI#full_doc_info.rev_tree,
+ {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)),
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = couch_doc:is_deleted(NewRevTree)
+ },
+ % If a replicated change did not modify the revision
+ % tree then we've got nothing else to do.
+ if NewFDI /= FDI -> ok; true ->
+ ?RETURN({ok, []})
+ end,
+ {NewFDI, Doc}.
+
+
+validate_doc_update(Db, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
+ #{
+ security_doc := Security
+ } = Db,
+ case catch check_is_admin(Db) of
+ ok -> validate_ddoc(Db, Doc);
+ Error -> ?RETURN(Error)
+ end;
+validate_doc_update(_Db, #doc{id = <<"_local/", _/binary>>}, _) ->
+ ok;
+validate_doc_update(Db, Doc, GetDiskDocFun) ->
#{
- tx := Tx,
- dir := Dir
+ security_doc := Security,
+ user_ctx := UserCtx,
+ validate_doc_update_funs := VDUs
} = Db,
- BinPrefix = erlfdb_directory:pack(Dir, Prefix),
- erlfdb:get_range_startswith(Tx, BinPrefix, Options).
+ Fun = fun() ->
+ DiskDoc = GetDiskDocFun(),
+ JsonCtx = fabric2_util:user_ctx_to_json(TxDb),
+ try
+ lists:map(fun(VDU) ->
+ case VDU(Doc, DiskDoc, JsonCtx, Security) of
+ ok -> ok;
+ Error -> ?RETURN(Error)
+ end
+ end, VDUs),
+ ok
+ catch
+ throw:Error ->
+ Error
+ end
+ end,
+ Stat = [couchdb, query_server, vdu_process_time],
+ if VDUs == [] -> ok; true ->
+ couch_stats:update_histogram(Stat, Fun)
+ end.
+
+
+validate_ddoc(Db, DDoc) ->
+ try
+ ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc))
+ catch
+ throw:{invalid_design_doc, Reason} ->
+ throw({bad_request, invalid_design_doc, Reason});
+ throw:{compilation_error, Reason} ->
+ throw({bad_request, compilation_error, Reason});
+ throw:Error ->
+ ?RETURN(Error)
+ end.
+
+
+find_prev_known_rev(_Pos, []) ->
+ not_found;
+find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) ->
+ % doc records are skipped because these are the result
+ % of replication sending us an update. We're only interested
+ % in what we have locally since we're comparing attachment
+ % stubs. The replicator should never do this because it
+ % should only ever send leaves but the possibility exists
+ % so we account for it.
+ find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
+ find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) ->
+ {Pos, [Rev || {Rev, _Val} <- DocPath]}.
+
+
+with_tx(#{tx := undefined} = Db, Fun) ->
+ fabric2_util:transactional(fun(Tx) ->
+ Fun(Db#{tx => Tx})
+ end);
+
+with_tx(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
+ Fun(Db).
+
diff --git a/src/fabric/src/fabric2_doc.erl b/src/fabric/src/fabric2_doc.erl
deleted file mode 100644
index d3ff04f39..000000000
--- a/src/fabric/src/fabric2_doc.erl
+++ /dev/null
@@ -1,494 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric2_doc).
-
-
--export([
- get_fdi/2,
-
- open/3,
- open_revs/4,
-
- update/3
-]).
-
-
--include_lib("couch/include/couch_db.hrl").
-
-
--define(RETURN(Term), throw({?MODULE, Term})).
-
-
-get_fdi(TxDb, DocId) ->
- Future = fabric2_db:get(TxDb, {<<"docs">>, DocId}),
- fdb_to_fdi(TxDb, DocId, erlfdb:wait(Future)).
-
-
-open(TxDb, DocId, {Pos, [Rev | _] = Path}) ->
- Key = {<<"revs">>, DocId, Pos, Rev},
- Future = fabric2_db:get(TxDb, Key),
- fdb_to_doc(TxDb, DocId, Pos, Path, erlfdb:wait(Future)).
-
-
-open_revs(TxDb, FDI, Revs, Options) ->
- #full_doc_info{
- id = Id,
- rev_tree = RevTree
- } = FDI,
- Latest = lists:member(latest, Options),
- {Found, Missing} = case Revs of
- all ->
- {couch_key_tree:get_all_leafs(RevTree), []};
- _ when Latest ->
- couch_key_tree:get_key_leafs(RevTree, Revs);
- _ ->
- couch_key_tree:get(RevTree, Revs)
- end,
- Docs = lists:map(fun({Value, {Pos, [Rev | _]} = RevPath}) ->
- case Value of
- ?REV_MISSING ->
- % We have the rev in our list but know nothing about it
- {{not_found, missing}, {Pos, Rev}};
- _ ->
- case open(TxDb, Id, RevPath) of
- #doc{} = Doc -> {ok, Doc};
- Else -> {Else, {Pos, Rev}}
- end
- end
- end, Found),
- MissingDocs = [{{not_found, missing}, MRev} || MRev <- Missing],
- {ok, Docs ++ MissingDocs}.
-
-
-% TODO: Handle _local docs separately.
-update(TxDb, #doc{} = Doc0, Options) ->
- UpdateType = case lists:member(replicated_changes, Options) of
- true -> replicated_changes;
- false -> interactive_edit
- end,
-
- try
- FDI1 = get_fdi(TxDb, Doc0#doc.id),
- Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType),
- Doc2 = case UpdateType of
- interactive_edit -> new_revid(Doc1);
- replicated_changes -> Doc1
- end,
- FDI2 = if FDI1 /= not_found -> FDI1; true ->
- #full_doc_info{id = Doc2#doc.id}
- end,
- {FDI3, Doc3} = merge_rev_tree(FDI2, Doc2, UpdateType),
-
- #{tx := Tx} = TxDb,
-
- % Delete old entry in changes feed
- OldSeqKey = {<<"changes">>, FDI3#full_doc_info.update_seq},
- OldSeqKeyBin = fabric2_db:pack(TxDb, OldSeqKey),
- erlfdb:clear(Tx, OldSeqKeyBin),
-
- % Add new entry to changes feed
- NewSeqKey = {<<"changes">>, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}},
- NewSeqKeyBin = fabric2_db:pack_vs(TxDb, NewSeqKey),
- erlfdb:set_versionstamped_key(Tx, NewSeqKeyBin, Doc3#doc.id),
-
- {RevStart, [Rev | _]} = Doc3#doc.revs,
-
- % Write document data
- {NewDocKey, NewDocVal} = doc_to_fdb(TxDb, Doc3),
- erlfdb:set(Tx, NewDocKey, NewDocVal),
-
- % Update revision tree entry
- {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI3),
- erlfdb:set_versionstamped_value(Tx, NewFDIKey, NewFDIVal),
-
- {DocIncr, DocDelIncr} = case {FDI1, FDI3} of
- {not_found, _} ->
- {1, 0};
- {#full_doc_info{deleted = true}, #full_doc_info{deleted = false}} ->
- {1, -1};
- {#full_doc_info{deleted = false}, #full_doc_info{deleted = true}} ->
- {-1, 1};
- _ ->
- {0, 0}
- end,
-
- DocCountKey = {<<"meta">>, <<"stats">>, <<"doc_count">>},
- DocCountKeyBin = fabric2_db:pack(TxDb, DocCountKey),
- DocDelCountKey = {<<"meta">>, <<"stats">>, <<"doc_del_count">>},
- DocDelCountKeyBin = fabric2_db:pack(TxDb, DocDelCountKey),
-
- if DocIncr == 0 -> ok; true ->
- erlfdb:add(Tx, DocCountKeyBin, DocIncr)
- end,
-
- if DocDelIncr == 0 -> ok; true ->
- erlfdb:add(Tx, DocDelCountKeyBin, DocDelIncr)
- end,
-
- % TODO: Count design docs separately
-
- % And done.
- {ok, {RevStart, Rev}}
- catch throw:{?MODULE, Return} ->
- Return
- end.
-
-
-prep_and_validate(TxDb, not_found, Doc, UpdateType) ->
- case Doc#doc.revs of
- {0, []} ->
- ok;
- _ when UpdateType == replicated_changes ->
- ok;
- _ ->
- ?RETURN({error, conflict})
- end,
- prep_and_validate(TxDb, Doc, fun() -> nil end);
-
-prep_and_validate(TxDb, FDI, Doc, interactive_edit) ->
- #doc{
- revs = {Start, Revs}
- } = Doc,
-
- Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree),
- LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) ->
- {{LeafStart, LeafRev}, Path}
- end, Leafs),
-
- GetDocFun = case Revs of
- [PrevRev | _] ->
- case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of
- {{Start, PrevRev}, Path} ->
- fun() -> open(TxDb, Doc#doc.id, {Start, Path}) end;
- false ->
- ?RETURN({error, conflict})
- end;
- [] ->
- case FDI#full_doc_info.deleted of
- true ->
- fun() -> nil end;
- false ->
- ?RETURN({error, conflict})
- end
- end,
- prep_and_validate(TxDb, Doc, GetDocFun);
-
-prep_and_validate(TxDb, FDI, Doc, replicated_changes) ->
- #full_doc_info{
- rev_tree = RevTree
- } = FDI,
- OldLeafs = couch_key_tree:get_all_leafs_full(RevTree),
- OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs],
-
- NewPath = couch_doc:to_path(Doc),
- NewRevTree = couch_key_tree:merge(RevTree, NewPath),
-
- Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
- LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) ->
- [{{Start, RevId}, FullPath}]
- end, Leafs),
- LeafRevsFullDict = dict:from_list(LeafRevsFull),
-
- #doc{
- revs = {DocStart, [DocRev | _]}
- } = Doc,
- DocRevId = {DocStart, DocRev},
-
- IsOldLeaf = lists:member(DocRevId, OldLeafsLU),
- GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of
- {ok, {DocStart, RevPath}} when not IsOldLeaf ->
- % An incoming replicated edit only sends us leaf
- % nodes which may have included multiple updates
- % we haven't seen locally. Thus we have to search
- % back through the tree to find the first edit
- % we do know about.
- case find_prev_known_rev(DocStart, RevPath) of
- not_found -> fun() -> nil end;
- PrevRevs -> fun() -> open(TxDb, Doc#doc.id, PrevRevs) end
- end;
- _ ->
- % The update merged to an internal node that we
- % already know about which means we're done with
- % this update.
- ?RETURN({ok, []})
- end,
-
- prep_and_validate(TxDb, Doc, GetDocFun).
-
-
-prep_and_validate(TxDb, Doc, GetDocBody) ->
- NewDoc = case couch_doc:has_stubs(Doc) of
- true ->
- case GetDocBody() of
- #doc{} = PrevDoc ->
- couch_doc:merge_stubs(Doc, PrevDoc);
- _ ->
- % Force a missing stubs error
- couch_doc:mege_stubs(Doc, #doc{})
- end;
- false ->
- Doc
- end,
- validate_doc_update(TxDb, NewDoc, GetDocBody),
- NewDoc.
-
-
-merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted ->
- % We're recreating a document that was previously
- % deleted. To check that this is a recreation from
- % the root we assert that the new document has a
- % revision depth of 1 (this is to avoid recreating a
- % doc from a previous internal revision) and is also
- % not deleted. To avoid expanding the revision tree
- % unnecessarily we create a new revision based on
- % the winning deleted revision.
-
- {RevDepth, _} = Doc#doc.revs,
- case RevDepth == 1 andalso not Doc#doc.deleted of
- true ->
- % Update the new doc based on revisions in OldInfo
- #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI),
- #rev_info{rev={OldPos, OldRev}} = WinningRev,
- Body = case couch_util:get_value(comp_body, Doc#doc.meta) of
- CompBody when is_binary(CompBody) ->
- couch_compress:decompress(CompBody);
- _ ->
- Doc#doc.body
- end,
- NewDoc = new_revid(Doc#doc{
- revs = {OldPos, [OldRev]},
- body = Body
- }),
-
- % Merge our modified new doc into the tree
- #full_doc_info{rev_tree = RevTree} = FDI,
- case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of
- {NewRevTree, new_leaf} ->
- % We changed the revision id so inform the caller
- NewFDI = FDI#full_doc_info{
- rev_tree = NewRevTree,
- deleted = false
- },
- {NewFDI, NewDoc};
- _ ->
- throw(doc_recreation_failed)
- end;
- _ ->
- ?RETURN({error, conflict})
- end;
-merge_rev_tree(FDI, Doc, interactive_edit) ->
- % We're attempting to merge a new revision into an
- % undeleted document. To not be a conflict we require
- % that the merge results in extending a branch.
-
- RevTree = FDI#full_doc_info.rev_tree,
- case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of
- {NewRevTree, new_leaf} when not Doc#doc.deleted ->
- NewFDI = FDI#full_doc_info{
- rev_tree = NewRevTree,
- deleted = false
- },
- {NewFDI, Doc};
- {NewRevTree, new_leaf} when Doc#doc.deleted ->
- % We have to check if we just deleted this
- % document completely or if it was a conflict
- % resolution.
- NewFDI = FDI#full_doc_info{
- rev_tree = NewRevTree,
- deleted = couch_doc:is_deleted(NewRevTree)
- },
- {NewFDI, Doc};
- _ ->
- ?RETURN({error, conflict})
- end;
-merge_rev_tree(FDI, Doc, replicated_changes) ->
- % We're merging in revisions without caring about
- % conflicts. Most likely this is a replication update.
- RevTree = FDI#full_doc_info.rev_tree,
- {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)),
- NewFDI = FDI#full_doc_info{
- rev_tree = NewRevTree,
- deleted = couch_doc:is_deleted(NewRevTree)
- },
- % If a replicated change did not modify the revision
- % tree then we've got nothing else to do.
- if NewFDI /= FDI -> ok; true ->
- ?RETURN({ok, []})
- end,
- {NewFDI, Doc}.
-
-
-
-validate_doc_update(TxDb, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
- case catch fabric2_db:check_is_admin(TxDb) of
- ok -> validate_ddoc(TxDb, Doc);
- Error -> ?RETURN(Error)
- end;
-validate_doc_update(_TxDb, #doc{id = <<"_local/", _/binary>>}, _) ->
- ok;
-validate_doc_update(TxDb, Doc, GetDiskDocFun) ->
- Fun = fun() ->
- DiskDoc = GetDiskDocFun(),
- JsonCtx = couch_util:json_user_ctx(TxDb),
- SecObj = fabric2_db:get_security(TxDb),
- try
- lists:map(fun(VDU) ->
- case VDU(Doc, DiskDoc, JsonCtx, SecObj) of
- ok -> ok;
- Error -> ?RETURN(Error)
- end
- end, fabric2_db:get_vdus(TxDb)),
- ok
- catch
- throw:Error ->
- Error
- end
- end,
- Stat = [couchdb, query_server, vdu_process_time],
- case fabric2_db:get_vdus(TxDb) of
- [] ->
- ok;
- _ ->
- couch_stats:update_histogram(Stat, Fun)
- end.
-
-
-validate_ddoc(TxDb, DDoc) ->
- try
- ok = couch_index_server:validate(TxDb, couch_doc:with_ejson_body(DDoc))
- catch
- throw:{invalid_design_doc, Reason} ->
- throw({bad_request, invalid_design_doc, Reason});
- throw:{compilation_error, Reason} ->
- throw({bad_request, compilation_error, Reason});
- throw:Error ->
- ?RETURN(Error)
- end.
-
-
-find_prev_known_rev(_Pos, []) ->
- not_found;
-find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) ->
- % doc records are skipped because these are the result
- % of replication sending us an update. We're only interested
- % in what we have locally since we're comparing attachment
- % stubs. The replicator should never do this because it
- % should only ever send leaves but the possibility exists
- % so we account for it.
- find_prev_known_rev(Pos - 1, RestPath);
-find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
- find_prev_known_rev(Pos - 1, RestPath);
-find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) ->
- {Pos, [Rev || {Rev, _Val} <- DocPath]}.
-
-
-new_revid(Doc) ->
- #doc{
- body = Body,
- revs = {OldStart, OldRevs},
- atts = Atts,
- deleted = Deleted
- } = Doc,
-
- DigestedAtts = lists:foldl(fun(Att, Acc) ->
- [N, T, M] = couch_att:fetch([name, type, md5], Att),
- case M == <<>> of
- true -> Acc;
- false -> [{N, T, M} | Acc]
- end
- end, [], Atts),
-
- Rev = case DigestedAtts of
- Atts2 when length(Atts) =/= length(Atts2) ->
- % We must have old style non-md5 attachments
- list_to_binary(integer_to_list(couch_util:rand32()));
- Atts2 ->
- OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
- SigTerm = [Deleted, OldStart, OldRev, Body, Atts2],
- couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]))
- end,
-
- Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}.
-
-
-doc_to_fdb(TxDb, #doc{} = Doc) ->
- #doc{
- id = Id,
- revs = {Start, [Rev | _]},
- body = Body,
- atts = Atts,
- deleted = Deleted
- } = Doc,
- Key = {<<"revs">>, Id, Start, Rev},
- KeyBin = fabric2_db:pack(TxDb, Key),
- Val = {Body, Atts, Deleted},
- {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
-
-
-fdb_to_doc(_TxDb, DocId, Pos, Path, Bin) when is_binary(Bin) ->
- {Body, Atts, Deleted} = binary_to_term(Bin, [safe]),
- #doc{
- id = DocId,
- revs = {Pos, Path},
- body = Body,
- atts = Atts,
- deleted = Deleted
- };
-fdb_to_doc(_TxDb, _DocId, _Pos, _Path, not_found) ->
- {not_found, missing}.
-
-
-fdi_to_fdb(TxDb, #full_doc_info{} = FDI) ->
- #full_doc_info{
- id = Id,
- deleted = Deleted,
- rev_tree = RevTree
- } = flush_tree(FDI),
-
- Key = {<<"docs">>, Id},
- KeyBin = fabric2_db:pack(TxDb, Key),
- RevTreeBin = term_to_binary(RevTree, [{minor_version, 1}]),
- ValTuple = {Deleted, RevTreeBin, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}},
- Val = fabric2_db:pack_vs(TxDb, ValTuple),
- {KeyBin, Val}.
-
-
-fdb_to_fdi(TxDb, Id, Bin) when is_binary(Bin) ->
- {Deleted, RevTreeBin, {versionstamp, V, B}} = fabric2_db:unpack(TxDb, Bin),
- RevTree = binary_to_term(RevTreeBin, [safe]),
- UpdateSeq = <<V:64/big, B:16/big>>,
- #full_doc_info{
- id = Id,
- deleted = Deleted,
- rev_tree = RevTree,
- update_seq = UpdateSeq
- };
-fdb_to_fdi(_TxDb, _Id, not_found) ->
- not_found.
-
-
-flush_tree(FDI) ->
- #full_doc_info{
- rev_tree = Unflushed
- } = FDI,
-
- Flushed = couch_key_tree:map(fun(_Rev, Value) ->
- case Value of
- #doc{deleted = Del} -> #leaf{deleted = Del};
- _ -> Value
- end
- end, Unflushed),
-
- FDI#full_doc_info{
- rev_tree = Flushed
- }.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
new file mode 100644
index 000000000..4764331ae
--- /dev/null
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -0,0 +1,490 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_fdb).
+
+
+-export([
+ init/3,
+
+ create/1,
+ open/1,
+ delete/1,
+ exists/1,
+ is_current/1,
+
+ get_info/1,
+ get_config/1,
+ set_config/3,
+
+ get_stat/2,
+ incr_stat/3,
+
+ get_full_doc_info/2,
+ get_doc_body/3,
+
+ store_doc/3,
+
+ get_changes/2
+]).
+
+
+% This will eventually be the `\xFFmetadataVersion` key that is
+% currently only available in FoundationDB master.
+%
+% https://forums.foundationdb.org/t/a-new-tool-for-managing-layer-metadata/1191
+%
+% Until then we'll fake the same behavior using a randomish
+% key for tracking metadata changse. Once we get to the
+% new feature this will be more performant by updating
+% this define.
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+
+% Prefix Definitions
+
+-define(CLUSTER_CONFIG, 0).
+-define(ALL_DBS, 1).
+-define(DBS, 15).
+-define(DB_CONFIG, 16).
+-define(DB_STATS, 17).
+-define(DB_ALL_DOCS, 18).
+-define(DB_CHANGES, 19).
+-define(DB_DOCS, 20).
+-define(DB_REVS, 21).
+
+
+% Various utility macros
+
+-define(REQUIRE_TX(Db), {erlfdb_transaction, _} = maps:get(Db, tx)).
+-define(REQUIRE_CURRENT(Db), true = db_is_current(Db)).
+
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+
+
+init(Tx, DbName, Options) ->
+ Root = erlfdb_directory:get_root(),
+ CouchDB = erlfdb_directory:create_or_open(Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ #{
+ name => DbName,
+ tx => Tx,
+ layer_prefix => Prefix,
+ options => Options
+ }.
+
+
+create(#{} = Db) ->
+ ?REQUIRE_TX(Db),
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db,
+
+ % Eventually DbPrefix will be HCA allocated. For now
+ % we're just using the DbName so that debugging is easier.
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+ ets:set(Tx, DbKey, DbPrefix),
+
+ UUID = fabric2_util:uuid(),
+
+ Defaults = [
+ {?DB_CONFIG, <<"uuid">>, UUID},
+ {?DB_CONFIG, <<"revs_limit">>, ?uint2bin(1000)},
+ {?DB_CONFIG, <<"security_doc">>, <<"{}">>},
+ {?DB_STATS, <<"doc_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_del_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_design_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"doc_local_count">>, ?uint2bin(0)},
+ {?DB_STATS, <<"size">>, ?uint2bin(2)}
+ ],
+ lists:foreach(fun({P, K, V}) ->
+ Key = erlfdb_tuple:pack({P, K}, DbPrefix),
+ erlfdb:set(Tx, Key, V)
+ end, Defaults),
+
+ Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+
+ Db#{
+ uuid => UUID,
+ db_prefix => DbPrefix,
+ version => Version,
+ revs_limit => 1000,
+ security_doc => {[]}
+ validate_doc_update_funs => [],
+
+ user_ctx => #user_ctx{},
+
+ before_doc_update => undefined,
+ after_doc_update => undefined
+ % All other db things as we add features
+ }.
+
+
+open(#{} = Db0) ->
+ ?REQUIRE_TX(Db0),
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db0,
+
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ DbPrefix = case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) -> Bin;
+ not_found -> erlang:error(database_does_not_exist)
+ end,
+
+ Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+
+ Db1 = Db0#{
+ db_preix => DbPrefix,
+ version => Version,
+
+ user_ctx => #user_ctx{},
+
+ % Place holders until we implement these
+ % bits.
+ validate_doc_upate_funs => [],
+ before_doc_update => undefined,
+ after_doc_read => undefined
+ },
+
+ Config = db_get_config(Db1),
+ lists:foldl(fun({Key, Val}) ->
+ case Key of
+ <<"uuid">> ->
+ Db1#{uuid => Val};
+ <<"revs_limit">> ->
+ Db1#{revs_limit => ?bin2uint(Val)};
+ <<"security_doc">> ->
+ Db1#{security_doc => ?JSON_DECODE(Val)}
+ end
+ end, Db1, db_get_config(Db1)).
+
+
+delete(#{} = Db) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ name := DbName,
+ tx := Tx,
+ layer_prefix := LayerPrefix,
+ db_prefix := DbPrefix
+ } = Db,
+
+ DbKey = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+ erlfdb:clear(Tx, DbKey),
+ erlfdb:clear_range_startswith(Tx, DbPrefix),
+ bump_metadata_version(),
+ ok.
+
+
+exists(#{name := DbName} = Db) when is_binary(DbName) ->
+ ?REQUIRE_TX(Db),
+ #{
+ tx := Tx,
+ layer_prefix := LayerPrefix
+ } = Db,
+
+ DbKey = erlfdb_tuple:pack({?ALL_DBS, DbName}, LayerPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, DbKey)) of
+ Bin when is_binary(Bin) -> true;
+ not_found -> false
+ end.
+
+
+is_current(#{} = Db) ->
+ ?REQUIRE_TX(Db),
+ #{
+ name := DbName,
+ md_version := MetaDataVersion
+ } = Db,
+
+ case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+ MetaDataVersion -> true;
+ _NewVersion -> false
+ end.
+
+
+get_info(#{} = Db) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ name := DbName,
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ {CStart, CEnd} = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
+ ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
+ {streaming_mode, exact},
+ {limit, 1},
+ {reverse, true}
+ ]),
+
+ StatsPrefix = erlfdb_tuple:pack(?DB_STATS, DbPrefix),
+ MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),
+
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ <<0:80>>;
+ [{SeqKey, _}] ->
+ {?DB_CHANGES, SeqBin} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
+ SeqBin
+ end,
+ CProp = {update_seq, fabric2_util:to_hex(RawSeq)},
+
+ MProps = lists:flatmap(fun({K, V}) ->
+ case erlfdb_tuple:unpack(K, DbPrefix) of
+ {?DB_STATS, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)}];
+ {?DB_STATS, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)}];
+ {?DB_STATS, <<"size">>} ->
+ Val = ?bin2uint(V),
+ [
+ {other, {[{data_size, Val}]}},
+ {sizes, {[
+ {active, 0},
+ {external, Val},
+ {file, 0}
+ ]}}
+ ];
+ {?DB_STATS, _} ->
+ []
+ end
+ end, erlfdb:wait(MetaFuture)),
+
+ [CProp | MProps].
+
+
+get_config(#{} = Db) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ {Start, End} = erlfdb_tuple:pack({?DB_CONFIG}, DbPrefix),
+ Future = erlfdb:get_range(Tx, Start, End),
+
+ lists:map(fun(K, V) ->
+ {?DB_CONFIG, Key} = erlfdb_tuple:unpack(K, DbPrefix),
+ {Key, V}
+ end, erlfdb:wait(Future)).
+
+
+set_config(#{} = Db, ConfigKey, ConfigVal) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ Key = erlfdb_tuple:pack({?DB_CONFIG, ConfigKey}, DbPrefix),
+ erlfdb:set(Tx, Key, ConfigVal).
+
+
+get_stat(#{} = Db, StatKey) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
+
+ % Might need to figure out some sort of type
+ % system here. Uints are because stats are all
+ % atomic op adds for the moment.
+ ?bin2uint(erlfdb:wait(erlfdb:get(Tx, Key))),
+ bump_metadata_version(Tx).
+
+
+incr_stat(_Db, _Statey, 0) ->
+ ok;
+
+incr_stat(#{} = Db, StatKey, Increment) when is_integer(Increment) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ Key = erlfdb_tuple:pack({?DB_STATS, StatKey}, DbPrefix),
+ erlfdb:add(Tx, Key, Increment).
+
+
+get_full_doc_info(#{} = Db, DocId) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ Key = erlfdb_tuple:pack({?DB_DOCS, DocId}, DbPrefix),
+ Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+ fdb_to_fdi(Db, DocId, Val).
+
+
+get_doc_body(#{} = Db, DocId, {Pos, [Rev | _]} = Path) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ Key = erlfdb_tupe:pack({?DB_REVS, DocId, Pos, Rev}, DbPrefix),
+ Val = erlfdb:wait(erlfdb:get(Tx, Key)),
+ fdb_to_doc(Db, DocId, Pos, Path, Val).
+
+
+store_doc(#{} = Db, #full_doc_info{} = FDI, #doc{} = Doc) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ #full_doc_info{
+ id = DocId,
+ update_seq = OldUpdateSeq
+ } = FDI,
+
+ #doc{
+ revs = {RevStart, [Rev | _]}
+ } = Doc,
+
+ % Delete old entry in changes feed
+ OldSeqKey = erlfdb_tuple:pack({?DB_CHANGES, OldUpdateSeq}, DbPrefix),
+ erlfdb:clear(Tx, OldSeqKey),
+
+ % Add new entry to changes feed
+ NewSeqKey = erlfdb_tuple:pack({?DB_CHANGES, ?UNSET_VS}, DbPrefix),
+ erlfdb:set_versionstamped_key(Tx, NewSeqKey, DocId),
+
+ % Write document data
+ {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
+ erlfdb:set(Tx, NewDocKey, NewDocVal),
+
+ % Update revision tree entry
+ {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI),
+ erlfdb:set_versionstamped_value(Tx, NewFDIKey, NewFDIVal).
+
+
+get_changes(#{} = Db, Options) ->
+ ?REQUIRE_CURRENT(Db),
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ {CStart, CEnd} = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
+ Future = erlfdb:get_range(Tx, CStart, CEnd, Options),
+ lists:map(fun({Key, Val}) ->
+ {?DB_CHANGES, Seq} = erlfdb_tuple:unpack(Key, DbPrefix),
+ {fabric2_util:to_hex(Seq), Val}
+ end, erlfdb:wait(Future)).
+
+
+
+
+bump_metadata_version(Tx) ->
+ % The 14 zero bytes is pulled from the PR for adding the
+ % metadata version key. Not sure why 14 bytes when version
+ % stamps are only 80, but whatever for now.
+ erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).
+
+
+doc_to_fdb(Db, #doc{} = Doc) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ #doc{
+ id = Id,
+ revs = {Start, [Rev | _]},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+
+ Key = erlfdb_tuple:pack({?DB_REVS, Id, Start, Rev}, DbPrefix),
+ Val = {Body, Atts, Deleted},
+ {Key, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_doc(_Db, DocId, Pos, Path, Bin) when is_binary(Bin) ->
+ {Body, Atts, Deleted} = binary_to_term(Bin, [safe]),
+ #doc{
+ id = DocId,
+ revs = {Pos, Path},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ };
+fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) ->
+ {not_found, missing}.
+
+
+fdi_to_fdb(Db, #full_doc_info{} = FDI) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+
+ #full_doc_info{
+ id = Id,
+ deleted = Deleted,
+ rev_tree = RevTree
+ } = flush_tree(FDI),
+
+ Key = erlfdb_tuple:pack({?DB_DOCS, Id}, DbPrefix),
+ RevTreeBin = term_to_binary(RevTree, [{minor_version, 1}]),
+ ValTuple = {
+ Deleted,
+ RevTreeBin,
+ {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}
+ },
+ Val = erlfdb_tuple:pack_vs(ValTuple),
+ {Key, Val}.
+
+
+flush_tree(FDI) ->
+ #full_doc_info{
+ rev_tree = Unflushed
+ } = FDI,
+
+ Flushed = couch_key_tree:map(fun(_Rev, Value) ->
+ case Value of
+ #doc{deleted = Del} -> #leaf{deleted = Del};
+ _ -> Value
+ end
+ end, Unflushed),
+
+ FDI#full_doc_info{
+ rev_tree = Flushed
+ }.
+
+
+fdb_to_fdi(_Db, Id, Bin) when is_binary(Bin) ->
+ {Deleted, RevTreeBin, {versionstamp, V, B}} = erlfdb_tuple:unpack(Bin),
+ RevTree = binary_to_term(RevTreeBin, [safe]),
+ UpdateSeq = <<V:64/big, B:16/big>>,
+ #full_doc_info{
+ id = Id,
+ deleted = Deleted,
+ rev_tree = RevTree,
+ update_seq = UpdateSeq
+ };
+fdb_to_fdi(_Db, _Id, not_found) ->
+ not_found. \ No newline at end of file
diff --git a/src/fabric/src/fabric_security.erl b/src/fabric/src/fabric2_security.erl
index b3f13886d..b3f13886d 100644
--- a/src/fabric/src/fabric_security.erl
+++ b/src/fabric/src/fabric2_security.erl
diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl
new file mode 100644
index 000000000..58223315a
--- /dev/null
+++ b/src/fabric/src/fabric2_server.erl
@@ -0,0 +1,91 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_server).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([
+ start_link/0,
+ fetch/1,
+ store/1
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(CLUSTER_FILE, "/usr/local/etc/foundationdb/fdb.cluster").
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+fetch(DbName) when is_binary(DbName) ->
+ case ets:lookup(?MODULE, DbName) of
+ [{DbName, #{} = Db] -> Db;
+ [] -> undefined
+ end.
+
+
+store(#{name := DbName} = Db0) when is_binary(DbName) ->
+ Db1 = Db0#{
+ tx => undefined,
+ user_ctx => #user_ctx{}
+ },
+ true = ets:insert(?MODULE, {DbName, Db1}}),
+ Db1.
+
+
+init(_) ->
+ ets:new(?MODULE, [
+ public,
+ named_table,
+ {read_concurrency, true},
+ {write_concurrency, true}
+ ]),
+
+ ClusterStr = config:get("erlfdb", "cluster_file", ?CLUSTER_FILE),
+ Db = erlfdb:open(iolist_to_binary(ClusterStr)),
+ application:set_env(fabric, db, Db),
+ init_cluster(Db),
+
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
diff --git a/src/fabric/src/fabric_sup.erl b/src/fabric/src/fabric2_sup.erl
index 54636638c..00eea3ed7 100644
--- a/src/fabric/src/fabric_sup.erl
+++ b/src/fabric/src/fabric2_sup.erl
@@ -10,7 +10,7 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(fabric_sup).
+-module(fabric2_sup).
-behaviour(supervisor).
-vsn(1).
@@ -36,8 +36,8 @@ init([]) ->
},
Children = [
#{
- id => fabric_server,
- start => {fabric_server, start_link, []}
+ id => fabric2_server,
+ start => {fabric2_server, start_link, []}
}
],
{ok, {Flags, Children}}.
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
new file mode 100644
index 000000000..1c47222f6
--- /dev/null
+++ b/src/fabric/src/fabric2_util.erl
@@ -0,0 +1,109 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_util).
+
+
+-export([
+ transactional/2,
+ transactional/3,
+
+ get_db_handle/0,
+
+ user_ctx_to_json/1,
+
+ uuid/0,
+
+ to_hex/1,
+
+ debug_cluster/0,
+ debug_cluster/2
+]).
+
+
+-define(PDICT_DB_KEY, '$erlfdb_handle').
+
+
+trasactional(Fun) when is_function(Fun, 1) ->
+ Db = get_db_handle(),
+ erlfdb:transactional(Db, Fun).
+
+
+get_db_handle() ->
+ case get(?PDICT_DB_KEY) of
+ undefined ->
+ {ok, Db} = application:get_env(fabric, db),
+ put(?PDICT_DB_KEY, Db),
+ Db;
+ Db ->
+ Db
+ end.
+
+
+user_ctx_to_json(Db) ->
+ UserCtx = fabric2_db:get_user_ctx(Db),
+ {[
+ {<<"db">>, fabric2_db:name(Db)},
+ {<<"name">>, Ctx#user_ctx.name},
+ {<<"roles">>, Ctx#user_ctx.roles}
+ ]}.
+
+
+
+uuid() ->
+ to_hex(crypto:strong_rand_bytes(16)).
+
+
+to_hex(Bin) ->
+ list_to_binary(to_hex_int(Bin)).
+
+
+to_hex_int(<<>>) ->
+ [];
+to_hex_int(<<Hi:4, Lo:4, Rest/binary>>) ->
+ [nibble_to_hex(Hi), nibble_to_hex(Lo) | to_hex(Rest)];
+
+
+nibble_to_hex(I) ->
+ case I of
+ 0 -> $0;
+ 1 -> $1;
+ 2 -> $2;
+ 3 -> $3;
+ 4 -> $4;
+ 5 -> $5;
+ 6 -> $6;
+ 7 -> $7;
+ 8 -> $8;
+ 9 -> $9;
+ 10 -> $a;
+ 11 -> $b;
+ 12 -> $c;
+ 13 -> $d;
+ 14 -> $e;
+ 15 -> $f
+ end.
+
+
+debug_cluster() ->
+ debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
+
+
+debug_cluster(Start, End) ->
+ transactional(fun(Tx) ->
+ lists:foreach(fun({Key, Val}) ->
+ io:format("~s => ~s~n", [
+ string:pad(erlfdb_util:repr(Key), 60),
+ erlfdb_util:repr(Val)
+ ])
+ end, erlfdb:get_range(Tx, Start, End))
+ end). \ No newline at end of file
diff --git a/src/fabric/src/fabric_server.erl b/src/fabric/src/fabric_server.erl
deleted file mode 100644
index 45e091f48..000000000
--- a/src/fabric/src/fabric_server.erl
+++ /dev/null
@@ -1,139 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_server).
--behaviour(gen_server).
--vsn(1).
-
-
--export([
- start_link/0,
- transactional/1,
- get_dir/1,
-
- debug_cluster/0,
- debug_cluster/2
-]).
-
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3
-]).
-
-
--record(st, {
- db
-}).
-
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-
-transactional(Fun) when is_function(Fun, 1) ->
- [{'$handle$', Db}] = ets:lookup(?MODULE, '$handle$'),
- erlfdb:transactional(Db, Fun).
-
-
-get_dir(Name) ->
- case ets:lookup(?MODULE, Name) of
- [{Name, Dir}] -> Dir;
- [] -> not_found
- end.
-
-
-debug_cluster() ->
- debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
-
-
-debug_cluster(Start, End) ->
- transactional(fun(Tx) ->
- lists:foreach(fun({Key, Val}) ->
- io:format("~s => ~s~n", [
- string:pad(erlfdb_util:repr(Key), 60),
- erlfdb_util:repr(Val)
- ])
- end, erlfdb:get_range(Tx, Start, End))
- end).
-
-
-init(_) ->
- ets:new(?MODULE, [
- protected,
- named_table,
- {read_concurrency, true}
- ]),
-
- ClusterStr = config:get("erlfdb", "cluster_file", "/usr/local/etc/foundationdb/fdb.cluster"),
- Db = erlfdb:open(iolist_to_binary(ClusterStr)),
- Dirs = init_cluster(Db),
-
- ets:insert(?MODULE, {'$handle$', Db}),
- lists:foreach(fun({K, V}) ->
- ets:insert(?MODULE, {K, V})
- end, Dirs),
-
- {ok, #st{db = Db}}.
-
-
-terminate(_, _St) ->
- ok.
-
-
-handle_call(Msg, _From, St) ->
- {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
-
-
-handle_cast(Msg, St) ->
- {stop, {bad_cast, Msg}, St}.
-
-
-handle_info(Msg, St) ->
- {stop, {bad_info, Msg}, St}.
-
-
-code_change(_OldVsn, St, _Extra) ->
- {ok, St}.
-
-
-
-init_cluster(Db) ->
- Dirs = erlfdb:transactional(Db, fun(Tx) ->
- Root = erlfdb_directory:root(),
- CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
- Dbs = erlfdb_directory:create_or_open(Tx, CouchDB, [<<"dbs">>]),
- Config = erlfdb_directory:create_or_open(Tx, CouchDB, [
- <<"meta">>,
- <<"config">>
- ]),
- [
- {root, Root},
- {config, Config},
- {dbs, Dbs}
- ]
- end),
- drain_ready(Dirs).
-
-
-drain_ready(Dirs) ->
- receive
- {Ref, ready} when is_reference(Ref) ->
- drain_ready(Dirs)
- after 100 ->
- Dirs
- end. \ No newline at end of file