summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-03-08 16:34:43 -0600
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-03-08 17:15:44 -0600
commitb24a593613ab64fde8087fbe551ee5adc0ce97f7 (patch)
treecb3db58c0f5810db4f426e335e4fc1cd6b2f38e6
parent730d671a8405fc250942e8e6241080b8e140dc63 (diff)
downloadcouchdb-b24a593613ab64fde8087fbe551ee5adc0ce97f7.tar.gz
Implement update_docs
This is a first pass attempt at an implementation of update_docs. There's probably a lot long here and a number of things are expected to blow up if features like VDUs are used.
-rw-r--r--FDB_NOTES.md45
-rw-r--r--src/chttpd/src/chttpd_db.erl43
-rw-r--r--src/fabric/src/fabric2.erl237
-rw-r--r--src/fabric/src/fabric2_db.erl162
-rw-r--r--src/fabric/src/fabric2_doc.erl447
5 files changed, 821 insertions, 113 deletions
diff --git a/FDB_NOTES.md b/FDB_NOTES.md
new file mode 100644
index 000000000..9278a2f0f
--- /dev/null
+++ b/FDB_NOTES.md
@@ -0,0 +1,45 @@
+Things of Note
+===
+
+
+1. If a replication sends us two revisions A and B where one is an
+ ancestor of the other, we likely have divergent behavior. However,
+ this should never happen In Theory.
+
+2. Multiple updates to the same document in a _bulk_docs (or if they
+ just happen to be in the same update batch in non-fdb CouchDB)
+ we likely have subtly different behavior.
+
+3. I'm relying on repeated reads in an fdb transaction to be "cheap"
+ in that the reads would be cached in the fdb_transaction object.
+ This needs to be checked for certainty but that appeared to
+ be how things behaved in testing.
+
+4. When attempting to create a doc from scratch in an interacitve_edit
+ update, with revisions specified *and* attachment stubs, the reported
+ error is now a conflict. Previously the missing_stubs error was
+ raised earlier.
+
+5. There may be a difference in behavior if a) there are no VDU functions
+ set on a db and no design documents in a batch. This is because in
+ this situation we don't run the prep_and_validate code on pre-fdb
+ CouchDB. The new code always checks stubs before merging revision trees.
+ I'm sure the old way would fail somehow, but it would fail further on
+ which means we may have failed with a different reason (conflict, etc)
+ before we got to the next place we check for missing stubs.
+
+6. I still need add paging for the DirectoryLayer results so that we
+ can use that for the _all_dbs end point.
+
+7. For multi-doc updates we'll need to investigate user versions on
+ versionstamps within a transaction. Also this likely prevents the
+ ability to have multiple updates to the same doc in a single
+ _bulk_docs transaction
+
+8. I'm not currently decreasing size after updating an existing document.
+ It looks like we're gonna have to do some sort of cleverness with the
+ existing FDI sizes field maybe?
+
+9. I'm cheating really bad with term_to_binary and ignoring serialization
+ but given that's all going to change I'm not too concerned about it
+ at this point. \ No newline at end of file
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index cceeac0d7..b299ef2d0 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -370,16 +370,13 @@ delete_db_req(#httpd{}=Req, DbName) ->
end.
do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
- Db = #{
- name => DbName,
- user_ctx => Ctx
- },
+ Db = fabric2:open_db(DbName, [{user_ctx, Ctx}]),
Fun(Req, Db).
-db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
+db_req(#httpd{method='GET',path_parts=[DbName]}=Req, Db) ->
% measure the time required to generate the etag, see if it's worth it
T0 = os:timestamp(),
- {ok, DbInfo} = fabric2:get_db_info(DbName),
+ {ok, DbInfo} = fabric2:get_db_info(Db),
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
send_json(Req, {DbInfo});
@@ -402,7 +399,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
"ok" ->
% async_batching
spawn(fun() ->
- case catch(fabric:update_doc(Db, Doc2, Options)) of
+ case catch(fabric2:update_doc(Db, Doc2, Options)) of
{ok, _} ->
chttpd_stats:incr_writes(),
ok;
@@ -422,7 +419,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
% normal
DocUrl = absolute_uri(Req, [$/, couch_util:url_encode(DbName),
$/, couch_util:url_encode(DocId)]),
- case fabric:update_doc(Db, Doc2, Options) of
+ case fabric2:update_doc(Db, Doc2, Options) of
{ok, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
@@ -497,7 +494,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req,
true -> [all_or_nothing|Options];
_ -> Options
end,
- case fabric:update_docs(Db, Docs, Options2) of
+ case fabric2:update_docs(Db, Docs, Options2) of
{ok, Results} ->
% output the results
chttpd_stats:incr_writes(length(Results)),
@@ -516,7 +513,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req,
send_json(Req, 417, ErrorsJson)
end;
false ->
- case fabric:update_docs(Db, Docs, [replicated_changes|Options]) of
+ case fabric2:update_docs(Db, Docs, [replicated_changes|Options]) of
{ok, Errors} ->
chttpd_stats:incr_writes(length(Docs)),
ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
@@ -960,7 +957,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
NewDoc = Doc#doc{
atts = UpdatedAtts ++ OldAtts2
},
- case fabric:update_doc(Db, NewDoc, Options) of
+ case fabric2:update_doc(Db, NewDoc, Options) of
{ok, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
@@ -978,11 +975,10 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
#doc_query_args{
update_type = UpdateType
} = parse_doc_query(Req),
- DbName = couch_db:name(Db),
- couch_db:validate_docid(Db, DocId),
+ DbName = fabric2_db:name(Db),
+ couch_doc:validate_docid(DocId),
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- Options = [{user_ctx,Ctx}, {w,W}],
+ Options = [{user_ctx, Ctx}],
Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName),
$/, couch_util:url_encode(DocId)]),
@@ -1010,7 +1006,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)),
spawn(fun() ->
- case catch(fabric:update_doc(Db, Doc, Options)) of
+ case catch(fabric2:update_doc(Db, Doc, Options)) of
{ok, _} ->
chttpd_stats:incr_writes(),
ok;
@@ -1044,7 +1040,7 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) ->
% open old doc
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
% save new doc
- case fabric:update_doc(Db,
+ case fabric2:update_doc(Db,
Doc#doc{id=TargetDocId, revs=TargetRevs}, [{user_ctx,Ctx}]) of
{ok, NewTargetRev} ->
chttpd_stats:incr_writes(),
@@ -1241,15 +1237,14 @@ send_updated_doc(Req, Db, DocId, Doc, Headers) ->
send_updated_doc(#httpd{user_ctx=Ctx} = Req, Db, DocId, #doc{deleted=Deleted}=Doc,
Headers, UpdateType) ->
- W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options =
case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of
"true" ->
- [full_commit, UpdateType, {user_ctx,Ctx}, {w,W}];
+ [full_commit, UpdateType, {user_ctx,Ctx}];
"false" ->
- [delay_commit, UpdateType, {user_ctx,Ctx}, {w,W}];
+ [delay_commit, UpdateType, {user_ctx,Ctx}];
_ ->
- [UpdateType, {user_ctx,Ctx}, {w,W}]
+ [UpdateType, {user_ctx,Ctx}]
end,
{Status, {etag, Etag}, Body} = update_doc(Db, DocId,
#doc{deleted=Deleted}=Doc, Options),
@@ -1269,7 +1264,7 @@ http_code_from_status(Status) ->
update_doc(Db, DocId, #doc{deleted=Deleted, body=DocBody}=Doc, Options) ->
{_, Ref} = spawn_monitor(fun() ->
- try fabric:update_doc(Db, Doc, Options) of
+ try fabric2:update_doc(Db, Doc, Options) of
Resp ->
exit({exit_ok, Resp})
catch
@@ -1339,7 +1334,7 @@ couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs} = Doc) ->
end,
Doc#doc{id=DocId, revs=Revs2};
couch_doc_from_req(Req, Db, DocId, Json) ->
- Doc = couch_db:doc_from_json_obj_validate(Db, Json),
+ Doc = couch_doc:from_json_obj_validate(Json, fabric2_db:name(Db)),
couch_doc_from_req(Req, Db, DocId, Doc).
@@ -1551,7 +1546,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
atts = NewAtt ++ [A || A <- Atts, couch_att:fetch(name, A) /= FileName]
},
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
- case fabric:update_doc(Db, DocEdited, [{user_ctx,Ctx}, {w,W}]) of
+ case fabric2:update_doc(Db, DocEdited, [{user_ctx,Ctx}, {w,W}]) of
{ok, UpdatedRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
diff --git a/src/fabric/src/fabric2.erl b/src/fabric/src/fabric2.erl
index 924fb8979..59b27d543 100644
--- a/src/fabric/src/fabric2.erl
+++ b/src/fabric/src/fabric2.erl
@@ -24,6 +24,9 @@
delete_db/1,
delete_db/2,
+ open_db/1,
+ open_db/2,
+
get_db_info/1,
get_doc_count/1,
get_doc_count/2,
@@ -32,7 +35,7 @@
%% set_revs_limit/3,
get_security/1,
- set_security/2
+ set_security/2,
%% get_purge_infos_limit/1,
%% set_purge_infos_limit/3,
@@ -51,9 +54,10 @@
%%
%% get_missing_revs/2,
%% get_missing_revs/3,
- %%
- %% update_doc/3,
- %% update_docs/3,
+
+ update_doc/3,
+ update_docs/3
+
%% purge_docs/3,
%%
%% att_receiver/2,
@@ -100,7 +104,7 @@ create_db(DbName, _Options) ->
fabric_server:transactional(fun(Tx) ->
try
DbDir = erlfdb_directory:create(Tx, DbsDir, DbName),
- init_db(Tx, DbDir)
+ fabric2_db:init(DbName, Tx, DbDir)
catch error:{erlfdb_directory, {create_error, path_exists, _}} ->
{error, file_exists}
end
@@ -122,60 +126,76 @@ delete_db(DbName, _Options) ->
end.
-get_db_info(DbName) ->
- [DbDir, MetaRows, LastChangeRow] = fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Meta = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>}),
- MetaFuture = erlfdb:get_range_startswith(Tx, Meta),
- {CStart, CEnd} = erlfdb_directory:range(DbDir, {<<"changes">>}),
- ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [
- {streaming_mode, exact},
- {limit, 1},
- {reverse, true}
- ]),
- [DbDir] ++ erlfdb:wait_for_all([MetaFuture, ChangesFuture])
+open_db(DbName) ->
+ open_db(DbName, []).
+
+
+open_db(DbName, Options) ->
+ fabric2_db:open(DbName, Options).
+
+
+get_db_info(DbName) when is_binary(DbName) ->
+ get_db_info(open_db(DbName));
+
+get_db_info(Db) ->
+ DbProps = fabric2_db:with_tx(Db, fun(TxDb) ->
+ {CStart, CEnd} = fabric2_db:range_bounds(TxDb, {<<"changes">>}),
+ ChangesFuture = fabric2_db:get_range(TxDb, {CStart}, {CEnd}, [
+ {streaming_mode, exact},
+ {limit, 1},
+ {reverse, true}
+ ]),
+
+ StatsPrefix = {<<"meta">>, <<"stats">>},
+ MetaFuture = fabric2_db:get_range_startswith(TxDb, StatsPrefix),
+
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ <<0:80>>;
+ [{SeqKey, _}] ->
+ {<<"changes">>, SeqBin} = fabric2_db:unpack(TxDb, SeqKey),
+ SeqBin
+ end,
+ CProp = {update_seq, ?l2b(couch_util:to_hex(RawSeq))},
+
+ MProps = lists:flatmap(fun({K, V}) ->
+ case fabric2_db:unpack(TxDb, K) of
+ {_, _, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)}];
+ {_, _, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)}];
+ {_, _, <<"size">>} ->
+ Val = ?bin2uint(V),
+ [
+ {other, {[{data_size, Val}]}},
+ {sizes, {[
+ {active, 0},
+ {external, Val},
+ {file, 0}
+ ]}}
+ ];
+ _ ->
+ []
+ end
+ end, erlfdb:wait(MetaFuture)),
+
+ [CProp | MProps]
end),
+
BaseProps = [
{cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}},
{compact_running, false},
{data_size, 0},
- {db_name, DbName},
+ {db_name, fabric2_db:name(Db)},
{disk_format_version, 0},
{disk_size, 0},
{instance_start_time, <<"0">>},
{purge_seq, 0}
],
- WithMeta = lists:foldl(fun({KBin, VBin}, PropAcc) ->
- case erlfdb_directory:unpack(DbDir, KBin) of
- {_, _, <<"doc_count">>} ->
- Val = ?bin2uint(VBin),
- lists:keystore(doc_count, 1, PropAcc, {doc_count, Val});
- {_, _, <<"doc_del_count">>} ->
- Val = ?bin2uint(VBin),
- lists:keystore(doc_del_count, 1, PropAcc, {doc_del_count, Val});
- {_, _, <<"size">>} ->
- Val = ?bin2uint(VBin),
- Other = {other, {[{data_size, Val}]}},
- Sizes = {sizes, {[
- {active, 0},
- {external, Val},
- {file, 0}
- ]}},
- PA1 = lists:keystore(other, 1, PropAcc, Other),
- lists:keystore(sizes, 1, PA1, Sizes);
- _ ->
- PropAcc
- end
- end, BaseProps, MetaRows),
- RawSeq = case LastChangeRow of
- [] ->
- <<0:80>>;
- [{KBin, _}] ->
- {<<"changes">>, SeqBin} = erlfdb_directory:unpack(DbDir, KBin),
- SeqBin
- end,
- Seq = ?l2b(couch_util:to_hex(RawSeq)),
- {ok, lists:keystore(update_seq, 1, WithMeta, {update_seq, Seq})}.
+
+ {ok, lists:foldl(fun({Key, Val}, Acc) ->
+ lists:keystore(Key, 1, Acc, {Key, Val})
+ end, BaseProps, DbProps)}.
get_doc_count(DbName) ->
@@ -191,57 +211,96 @@ get_doc_count(DbName, <<"_design">>) ->
get_doc_count(DbName, <<"_local">>) ->
get_doc_count(DbName, <<"doc_local_count">>);
-get_doc_count(DbName, Key) ->
- fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Key = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>, Key}),
- VBin = erlfdb:wait(erlfdb:get(Tx, Key)),
- ?bin2uint(VBin)
+get_doc_count(Db, Key) ->
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ Future = fabric2_db:get(TxDb, {<<"meta">>, <<"stats">>, Key}),
+ ?bin2uint(erlfdb:wait(Future))
end).
-get_security(DbName) ->
- SecJson = fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>},
- Key = erlfdb_directory:pack(DbDir, Tuple),
- erlfdb:wait(erlfdb:get(Tx, Key))
+get_security(DbName) when is_binary(DbName) ->
+ get_security(open_db(DbName));
+
+get_security(Db) ->
+ Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
+ SecJson = fabric2_db:with_tx(Db, fun(TxDb) ->
+ erlfdb:wait(fabric2_db:get(TxDb, Key))
end),
?JSON_DECODE(SecJson).
-set_security(DbName, ErlJson) ->
+set_security(DbName, ErlJson) when is_binary(DbName) ->
+ set_security(open_db(DbName), ErlJson);
+
+set_security(Db, ErlJson) ->
+ Key = {<<"meta">>, <<"config">>, <<"security_doc">>},
SecJson = ?JSON_ENCODE(ErlJson),
- fabric_server:transactional(fun(Tx) ->
- DbDir = open_db(Tx, DbName),
- Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>},
- Key = erlfdb_directory:pack(DbDir, Tuple),
- erlfdb:set(Tx, Key, SecJson)
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ fabric2_db:set(TxDb, Key, SecJson)
end).
-init_db(Tx, DbDir) ->
- Defaults = [
- {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
- {{<<"meta">>, <<"config">>, <<"security_doc">>}, ?DEFAULT_SECURITY},
- {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
- {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
- ],
- lists:foreach(fun({K, V}) ->
- BinKey = erlfdb_directory:pack(DbDir, K),
- erlfdb:set(Tx, BinKey, V)
- end, Defaults).
+update_doc(Db, Doc, Options) ->
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+ {ok, []} ->
+ % replication no-op
+ #doc{revs = {Pos, [RevId | _]}} = doc(Db, Doc),
+ {ok, {Pos, RevId}};
+ {ok, NewRev} ->
+ {ok, NewRev};
+ {error, Error} ->
+ throw(Error)
+ end
+ end).
-open_db(Tx, DbName) ->
- % We'll eventually want to cache this in the
- % fabric_server ets table.
- DbsDir = fabric_server:get_dir(dbs),
- try
- erlfdb_directory:open(Tx, DbsDir, DbName)
- catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
- erlang:error(database_does_not_exist)
+update_docs(DbName, Docs, Options) when is_binary(DbName) ->
+ update_docs(open_db(DbName, Options), Docs, Options);
+
+update_docs(Db, Docs, Options) ->
+ fabric2_db:with_tx(Db, fun(TxDb) ->
+ {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) ->
+ case fabric2_doc:update(TxDb, Doc, opts(Options)) of
+ {ok, _} = Resp ->
+ {Resp, Acc};
+ {error, _} = Resp ->
+ {Resp, error}
+ end
+ end, ok, Docs),
+ {Status, Resps}
+ end).
+
+
+docs(Db, Docs) ->
+ lists:map(fun(Doc) -> doc(Db, Doc) end, Docs).
+
+
+doc(_Db, #doc{} = Doc) ->
+ Doc;
+
+doc(Db, {_} = Doc) ->
+ couch_db:doc_from_json_obj_validate(Db, Doc);
+
+doc(_Db, Doc) ->
+ erlang:error({illegal_doc_format, Doc}).
+
+
+opts(Options) ->
+ lists:foldl(fun(Opt, Acc) ->
+ add_option(Opt, Acc)
+ end, Options, [user_ctx, io_priority]).
+
+
+add_option(Key, Options) ->
+ case couch_util:get_value(Key, Options) of
+ undefined ->
+ case erlang:get(Key) of
+ undefined ->
+ Options;
+ Value ->
+ [{Key, Value} | Options]
+ end;
+ _ ->
+ Options
end.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
new file mode 100644
index 000000000..59b56a67a
--- /dev/null
+++ b/src/fabric/src/fabric2_db.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_db).
+
+
+-export([
+ open/2,
+ init/3,
+
+ name/1,
+
+ with_tx/2,
+
+ pack/2,
+ unpack/2,
+ range_bounds/2,
+
+ get/2,
+ get_range/3,
+ get_range/4,
+ get_range_startswith/2,
+ get_range_startswith/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric.hrl").
+
+
+open(DbName, Options) when is_binary(DbName), is_list(Options) ->
+ BaseDb = #{
+ name => DbName,
+ tx => undefined,
+ dir => undefined,
+ user_ctx => #user_ctx{},
+ validate_doc_update => undefined
+ },
+ lists:foldl(fun({K, V}, DbAcc) ->
+ maps:put(K, V, DbAcc)
+ end, BaseDb, Options).
+
+
+name(#{name := Name}) ->
+ Name.
+
+
+init(DbName, Tx, DbDir) ->
+ TxDb = open(DbName, [{tx, Tx}, {dir, DbDir}]),
+ Defaults = [
+ {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)},
+ {{<<"meta">>, <<"config">>, <<"security_doc">>}, <<"{}">>},
+ {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)},
+ {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)}
+ ],
+ lists:foreach(fun({K, V}) ->
+ erlfdb:set(Tx, pack(TxDb, K), V)
+ end, Defaults).
+
+
+with_tx(Db, Fun) when is_function(Fun, 1) ->
+ DbName = maps:get(name, Db),
+ DbsDir = fabric_server:get_dir(dbs),
+ fabric_server:transactional(fun(Tx) ->
+ % We'll eventually want to cache this in the
+ % fabric_server ets table.
+ DbDir = try
+ erlfdb_directory:open(Tx, DbsDir, DbName)
+ catch error:{erlfdb_directory, {open_error, path_missing, _}} ->
+ erlang:error(database_does_not_exist)
+ end,
+ TxDb = Db#{tx := Tx, dir := DbDir},
+ Fun(TxDb)
+ end).
+
+
+pack(#{dir := undefined} = Db, _Tuple) ->
+ erlang:error({no_directory, Db});
+
+pack(Db, Tuple) ->
+ #{
+ dir := Dir
+ } = Db,
+ erlfdb_directory:pack(Dir, Tuple).
+
+
+unpack(#{dir := undefined} = Db, _Key) ->
+ erlang:error({no_directory, Db});
+
+unpack(Db, Key) ->
+ #{
+ dir := Dir
+ } = Db,
+ erlfdb_directory:unpack(Dir, Key).
+
+
+range_bounds(#{dir := undefined} = Db, _Key) ->
+ erlang:error({no_directory, Db});
+
+range_bounds(Db, Key) ->
+ #{
+ dir := Dir
+ } = Db,
+ erlfdb_directory:range(Dir, Key).
+
+
+
+get(#{tx := undefined} = Db, _Key) ->
+ erlang:error({invalid_tx_db, Db});
+
+get(Db, Key) ->
+ #{
+ tx := Tx,
+ dir := Dir
+ } = Db,
+ BinKey = erlfdb_directory:pack(Dir, Key),
+ erlfdb:get(Tx, BinKey).
+
+
+get_range(Db, StartKey, EndKey) ->
+ get_range(Db, StartKey, EndKey, []).
+
+
+get_range(#{tx := undefined} = Db, _, _, _) ->
+ erlang:error({invalid_tx_db, Db});
+
+get_range(Db, StartKey, EndKey, Options) ->
+ #{
+ tx := Tx,
+ dir := Dir
+ } = Db,
+ BinStartKey = erlfdb_directory:pack(Dir, StartKey),
+ BinEndKey= erlfdb_directory:pack(Dir, EndKey),
+ erlfdb:get_range(Tx, BinStartKey, BinEndKey, Options).
+
+
+get_range_startswith(Db, Prefix) ->
+ get_range_startswith(Db, Prefix, []).
+
+
+get_range_startswith(#{tx := undefined} = Db, _, _) ->
+ erlang:error({invalid_tx_db, Db});
+
+get_range_startswith(Db, Prefix, Options) ->
+ #{
+ tx := Tx,
+ dir := Dir
+ } = Db,
+ BinPrefix = erlfdb_directory:pack(Dir, Prefix),
+ erlfdb:get_range_startswith(Tx, BinPrefix, Options).
diff --git a/src/fabric/src/fabric2_doc.erl b/src/fabric/src/fabric2_doc.erl
new file mode 100644
index 000000000..042e8a5b9
--- /dev/null
+++ b/src/fabric/src/fabric2_doc.erl
@@ -0,0 +1,447 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_doc).
+
+
+-export([
+ get_fdi/2,
+ open/3,
+ update/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(RETURN(Term), throw({?MODULE, Term})).
+
+
+get_fdi(TxDb, DocId) ->
+ Future = fabric2_db:get(TxDb, {<<"revs">>, DocId}),
+ fdb_to_fdi(DocId, erlfdb:wait(Future)).
+
+
+open(TxDb, DocId, {Pos, [Rev | _] = Path}) ->
+ Key = {<<"docs">>, DocId, Pos, Rev},
+ Future = fabric2_db:get(TxDb, Key),
+ fdb_to_doc(DocId, Pos, Path, erlfdb:wait(Future)).
+
+
+% TODO: Handle _local docs separately.
+update(TxDb, #doc{} = Doc0, Options) ->
+ UpdateType = case lists:member(replicated_changes, Options) of
+ true -> replicated_changes;
+ false -> interactive_edit
+ end,
+
+ try
+ FDI1 = get_fdi(TxDb, Doc0#doc.id),
+ Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType),
+ Doc2 = case UpdateType of
+ interactive_edit -> new_revid(Doc1);
+ replicated_changes -> Doc1
+ end,
+ {FDI2, Doc3} = merge_rev_tree(FDI1, Doc2, UpdateType),
+
+ #{tx := Tx} = TxDb,
+
+ % Delete old entry in changes feed
+ OldSeqKey = {<<"changes">>, FDI1#full_doc_info.update_seq},
+ OldSeqKeyBin = fabric2_db:pack(TxDb, OldSeqKey),
+ erlfdb:clear(Tx, OldSeqKeyBin),
+
+ % Add new entry to changes feed
+ NewSeqKey = {<<"changes">>, <<16#FFFFFFFFFFFFFFFFFFFF:80>>},
+ NewSeqKeyBin = fabric2_db:pack_vs(TxDb, NewSeqKey),
+ erlfdb:set_versionstamped_key(Tx, NewSeqKeyBin, Doc3#doc.id),
+
+ {RevStart, [Rev | _]} = Doc3#doc.revs,
+
+ % Write document data
+ {NewDocKey, NewDocVal} = doc_to_fdb(TxDb, Doc3),
+ erlfdb:set(Tx, NewDocKey, NewDocVal),
+
+ % Update revision tree entry
+ {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI2),
+ erlfdb:set_versionstampled_value(Tx, NewFDIKey, NewFDIVal),
+
+ {DocIncr, DocDelIncr} = case {FDI1, FDI2} of
+ {not_found, _} ->
+ {1, 0};
+ {#full_doc_info{deleted = true}, #full_doc_info{deleted = false}} ->
+ {1, -1};
+ {#full_doc_info{deleted = false}, #full_doc_info{deleted = true}} ->
+ {-1, 1};
+ _ ->
+ {0, 0}
+ end,
+
+ DocCountKey = {<<"meta">>, <<"stats">>, <<"doc_count">>},
+ DocCountKeyBin = fabric2_db:pack(TxDb, DocCountKey),
+ DocDelCountKey = {<<"meta">>, <<"stats">>, <<"doc_del_count">>},
+ DocDelCountKeyBin = fabric2_db:pack(TxDb, DocDelCountKey),
+
+ if DocIncr == 0 -> ok; true ->
+ erlfdb:add(Tx, DocCountKeyBin, DocIncr)
+ end,
+
+ if DocDelIncr == 0 -> ok; true ->
+ erlfdb:add(Tx, DocDelCountKeyBin, DocDelIncr)
+ end,
+
+ % TODO: Count design docs separately
+
+ % And done.
+ {ok, {RevStart, Rev}}
+ catch throw:{?MODULE, Return} ->
+ Return
+ end.
+
+
+prep_and_validate(TxDb, not_found, Doc, UpdateType) ->
+ case Doc#doc.revs of
+ {0, []} when UpdateType == interactive_edit ->
+ ?RETURN({error, conflict});
+ _ ->
+ ok
+ end,
+ prep_and_validate(TxDb, Doc, fun() -> nil end);
+
+prep_and_validate(TxDb, FDI, Doc, interactive_edit) ->
+ #doc{
+ revs = {Start, Revs}
+ } = Doc,
+
+ Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree),
+ LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) ->
+ {{LeafStart, LeafRev}, Path}
+ end, Leafs),
+
+ GetDocFun = case Revs of
+ [PrevRev | _] ->
+ case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of
+ {{Start, PrevRev}, Path} ->
+ fun() -> open(TxDb, Doc#doc.id, {Start, Path}) end;
+ false ->
+ ?RETURN({error, conflict})
+ end;
+ [] ->
+ case FDI#full_doc_info.deleted of
+ true ->
+ fun() -> nil end;
+ false ->
+ ?RETURN({error, conflict})
+ end
+ end,
+ prep_and_validate(TxDb, Doc, GetDocFun);
+
+prep_and_validate(TxDb, FDI, Doc, replicated_changes) ->
+ #full_doc_info{
+ rev_tree = RevTree
+ } = FDI,
+ OldLeafs = couch_key_tree:get_all_leafs_full(RevTree),
+ OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs],
+
+ NewPath = couch_doc:to_path(Doc),
+ NewRevTree = couch_key_tree:merge(RevTree, NewPath),
+
+ Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+ LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) ->
+ [{{Start, RevId}, FullPath}]
+ end, Leafs),
+ LeafRevsFullDict = dict:from_list(LeafRevsFull),
+
+ #doc{
+ revs = {DocStart, [DocRev | _]}
+ } = Doc,
+ DocRevId = {DocStart, DocRev},
+
+ IsOldLeaf = lists:member(DocRevId, OldLeafsLU),
+ GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of
+ {ok, {DocStart, RevPath}} when not IsOldLeaf ->
+ % An incoming replicated edit only sends us leaf
+ % nodes which may have included multiple updates
+ % we haven't seen locally. Thus we have to search
+ % back through the tree to find the first edit
+ % we do know about.
+ case find_prev_known_rev(DocStart, RevPath) of
+ not_found -> fun() -> nil end;
+ PrevRevs -> fun() -> open(TxDb, Doc#doc.id, PrevRevs) end
+ end;
+ _ ->
+ % The update merged to an internal node that we
+ % already know about which means we're done with
+ % this update.
+ ?RETURN({ok, []})
+ end,
+
+ prep_and_validate(TxDb, Doc, GetDocFun).
+
+
+prep_and_validate(TxDb, Doc, GetDocBody) ->
+ NewDoc = case couch_doc:has_stubs(Doc) of
+ true ->
+ case GetDocBody() of
+ #doc{} = PrevDoc ->
+ couch_doc:merge_stubs(Doc, PrevDoc);
+ _ ->
+ % Force a missing stubs error
+ couch_doc:mege_stubs(Doc, #doc{})
+ end;
+ false ->
+ Doc
+ end,
+ validate_doc_update(TxDb, NewDoc, GetDocBody),
+ NewDoc.
+
+
+merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted ->
+ % We're recreating a document that was previously
+ % deleted. To check that this is a recreation from
+ % the root we assert that the new document has a
+ % revision depth of 1 (this is to avoid recreating a
+ % doc from a previous internal revision) and is also
+ % not deleted. To avoid expanding the revision tree
+ % unnecessarily we create a new revision based on
+ % the winning deleted revision.
+
+ {RevDepth, _} = Doc#doc.revs,
+ case RevDepth == 1 andalso not Doc#doc.deleted of
+ true ->
+ % Update the new doc based on revisions in OldInfo
+ #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI),
+ #rev_info{rev={OldPos, OldRev}} = WinningRev,
+ Body = case couch_util:get_value(comp_body, Doc#doc.meta) of
+ CompBody when is_binary(CompBody) ->
+ couch_compress:decompress(CompBody);
+ _ ->
+ Doc#doc.body
+ end,
+ NewDoc = new_revid(Doc#doc{
+ revs = {OldPos, [OldRev]},
+ body = Body
+ }),
+
+ % Merge our modified new doc into the tree
+ #full_doc_info{rev_tree = RevTree} = FDI,
+ case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of
+ {NewRevTree, new_leaf} ->
+ % We changed the revision id so inform the caller
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = false
+ },
+ {NewFDI, NewDoc};
+ _ ->
+ throw(doc_recreation_failed)
+ end;
+ _ ->
+ ?RETURN({error, conflict})
+ end;
+merge_rev_tree(FDI, Doc, interactive_edit) ->
+ % We're attempting to merge a new revision into an
+ % undeleted document. To not be a conflict we require
+ % that the merge results in extending a branch.
+
+ RevTree = FDI#full_doc_info.rev_tree,
+ case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of
+ {NewRevTree, new_leaf} when not Doc#doc.deleted ->
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = false
+ },
+ {NewFDI, Doc};
+ {NewRevTree, new_leaf} when Doc#doc.deleted ->
+ % We have to check if we just deleted this
+ % document completely or if it was a conflict
+ % resolution.
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = couch_doc:is_deleted(NewRevTree)
+ },
+ {NewFDI, Doc};
+ _ ->
+ ?RETURN({error, conflict})
+ end;
+merge_rev_tree(FDI, Doc, replicated_changes) ->
+ % We're merging in revisions without caring about
+ % conflicts. Most likely this is a replication update.
+ RevTree = FDI#full_doc_info.rev_tree,
+ {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)),
+ NewFDI = FDI#full_doc_info{
+ rev_tree = NewRevTree,
+ deleted = couch_doc:is_deleted(NewRevTree)
+ },
+ % If a replicated change did not modify the revision
+ % tree then we've got nothing else to do.
+ if NewFDI /= FDI -> ok; true ->
+ ?RETURN({ok, []})
+ end,
+ {NewFDI, Doc}.
+
+
+
+validate_doc_update(TxDb, #doc{id = <<"_design/", _/binary>>} = Doc, _) ->
+ case catch fabric2_db:check_is_admin(TxDb) of
+ ok -> validate_ddoc(TxDb, Doc);
+ Error -> ?RETURN(Error)
+ end;
+validate_doc_update(_TxDb, #doc{id = <<"_local/", _/binary>>}, _) ->
+ ok;
+validate_doc_update(TxDb, Doc, GetDiskDocFun) ->
+ Fun = fun() ->
+ DiskDoc = GetDiskDocFun(),
+ JsonCtx = couch_util:json_user_ctx(TxDb),
+ SecObj = fabric2_db:get_security(TxDb),
+ try
+ lists:map(fun(VDU) ->
+ case VDU(Doc, DiskDoc, JsonCtx, SecObj) of
+ ok -> ok;
+ Error -> ?RETURN(Error)
+ end
+ end, fabric2_db:get_vdus(TxDb)),
+ ok
+ catch
+ throw:Error ->
+ Error
+ end
+ end,
+ Stat = [couchdb, query_server, vdu_process_time],
+ couch_stats:update_histogram(Stat, Fun).
+
+
+validate_ddoc(TxDb, DDoc) ->
+ try
+ ok = couch_index_server:validate(TxDb, couch_doc:with_ejson_body(DDoc))
+ catch
+ throw:{invalid_design_doc, Reason} ->
+ throw({bad_request, invalid_design_doc, Reason});
+ throw:{compilation_error, Reason} ->
+ throw({bad_request, compilation_error, Reason});
+ throw:Error ->
+ ?RETURN(Error)
+ end.
+
+
+find_prev_known_rev(_Pos, []) ->
+ not_found;
+find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) ->
+ % doc records are skipped because these are the result
+ % of replication sending us an update. We're only interested
+ % in what we have locally since we're comparing attachment
+ % stubs. The replicator should never do this because it
+ % should only ever send leaves but the possibility exists
+ % so we account for it.
+ find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
+ find_prev_known_rev(Pos - 1, RestPath);
+find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) ->
+ {Pos, [Rev || {Rev, _Val} <- DocPath]}.
+
+
+new_revid(Doc) ->
+ #doc{
+ body = Body,
+ revs = {OldStart, OldRevs},
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+
+ DigestedAtts = lists:foldl(fun(Att, Acc) ->
+ [N, T, M] = couch_att:fetch([name, type, md5], Att),
+ case M == <<>> of
+ true -> Acc;
+ false -> [{N, T, M} | Acc]
+ end
+ end, [], Atts),
+
+ Rev = case DigestedAtts of
+ Atts2 when length(Atts) =/= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ list_to_binary(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end,
+ SigTerm = [Deleted, OldStart, OldRev, Body, Atts2],
+ couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}]))
+ end,
+
+ Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}.
+
+
+doc_to_fdb(TxDb, #doc{} = Doc) ->
+ #doc{
+ id = Id,
+ revs = {Start, [Rev | _]},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ } = Doc,
+ Key = {<<"docs">>, Id, Start, Rev},
+ KeyBin = fabric2_db:pack(TxDb, Key),
+ Val = {Body, Atts, Deleted},
+ {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_doc(DocId, Pos, Path, Bin) when is_binary(Bin) ->
+ {Body, Atts, Deleted} = binary_to_term(Bin, [safe]),
+ #doc{
+ id = DocId,
+ revs = {Pos, Path},
+ body = Body,
+ atts = Atts,
+ deleted = Deleted
+ };
+fdb_to_doc(_DocId, _Pos, _Path, not_found) ->
+ {not_found, missing}.
+
+
+fdi_to_fdb(TxDb, #full_doc_info{} = FDI) ->
+ #full_doc_info{
+ id = Id,
+ deleted = Deleted,
+ rev_tree = RevTree
+ } = flush_tree(FDI),
+
+ Key = {<<"revs">>, Id},
+ KeyBin = fabric2_db:pack(TxDb, Key),
+ Val = {Deleted, RevTree, <<16#FFFFFFFFFFFFFFFFFFFF:80>>},
+ {KeyBin, term_to_binary(Val, [{minor_version, 1}])}.
+
+
+fdb_to_fdi(Id, Bin) when is_binary(Bin) ->
+ {Deleted, RevTree, UpdateSeq} = binary_to_term(Bin, [safe]),
+ #full_doc_info{
+ id = Id,
+ deleted = Deleted,
+ rev_tree = RevTree,
+ update_seq = UpdateSeq
+ };
+fdb_to_fdi(_Id, not_found) ->
+ not_found.
+
+
+flush_tree(FDI) ->
+ #full_doc_info{
+ rev_tree = Unflushed
+ } = FDI,
+
+ Flushed = couch_key_tree:map(fun(_Rev, Value) ->
+ case Value of
+ #doc{deleted = Del} -> #leaf{deleted = Del};
+ _ -> Value
+ end
+ end, Unflushed),
+
+ FDI#full_doc_info{
+ rev_tree = Flushed
+ }.