diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-03-08 16:34:43 -0600 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-03-08 17:15:44 -0600 |
commit | b24a593613ab64fde8087fbe551ee5adc0ce97f7 (patch) | |
tree | cb3db58c0f5810db4f426e335e4fc1cd6b2f38e6 | |
parent | 730d671a8405fc250942e8e6241080b8e140dc63 (diff) | |
download | couchdb-b24a593613ab64fde8087fbe551ee5adc0ce97f7.tar.gz |
Implement update_docs
This is a first pass attempt at an implementation of update_docs.
There's probably a lot long here and a number of things are expected to
blow up if features like VDUs are used.
-rw-r--r-- | FDB_NOTES.md | 45 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 43 | ||||
-rw-r--r-- | src/fabric/src/fabric2.erl | 237 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 162 | ||||
-rw-r--r-- | src/fabric/src/fabric2_doc.erl | 447 |
5 files changed, 821 insertions, 113 deletions
diff --git a/FDB_NOTES.md b/FDB_NOTES.md new file mode 100644 index 000000000..9278a2f0f --- /dev/null +++ b/FDB_NOTES.md @@ -0,0 +1,45 @@ +Things of Note +=== + + +1. If a replication sends us two revisions A and B where one is an + ancestor of the other, we likely have divergent behavior. However, + this should never happen In Theory. + +2. Multiple updates to the same document in a _bulk_docs (or if they + just happen to be in the same update batch in non-fdb CouchDB) + we likely have subtly different behavior. + +3. I'm relying on repeated reads in an fdb transaction to be "cheap" + in that the reads would be cached in the fdb_transaction object. + This needs to be checked for certainty but that appeared to + be how things behaved in testing. + +4. When attempting to create a doc from scratch in an interacitve_edit + update, with revisions specified *and* attachment stubs, the reported + error is now a conflict. Previously the missing_stubs error was + raised earlier. + +5. There may be a difference in behavior if a) there are no VDU functions + set on a db and no design documents in a batch. This is because in + this situation we don't run the prep_and_validate code on pre-fdb + CouchDB. The new code always checks stubs before merging revision trees. + I'm sure the old way would fail somehow, but it would fail further on + which means we may have failed with a different reason (conflict, etc) + before we got to the next place we check for missing stubs. + +6. I still need add paging for the DirectoryLayer results so that we + can use that for the _all_dbs end point. + +7. For multi-doc updates we'll need to investigate user versions on + versionstamps within a transaction. Also this likely prevents the + ability to have multiple updates to the same doc in a single + _bulk_docs transaction + +8. I'm not currently decreasing size after updating an existing document. + It looks like we're gonna have to do some sort of cleverness with the + existing FDI sizes field maybe? + +9. I'm cheating really bad with term_to_binary and ignoring serialization + but given that's all going to change I'm not too concerned about it + at this point.
\ No newline at end of file diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index cceeac0d7..b299ef2d0 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -370,16 +370,13 @@ delete_db_req(#httpd{}=Req, DbName) -> end. do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) -> - Db = #{ - name => DbName, - user_ctx => Ctx - }, + Db = fabric2:open_db(DbName, [{user_ctx, Ctx}]), Fun(Req, Db). -db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> +db_req(#httpd{method='GET',path_parts=[DbName]}=Req, Db) -> % measure the time required to generate the etag, see if it's worth it T0 = os:timestamp(), - {ok, DbInfo} = fabric2:get_db_info(DbName), + {ok, DbInfo} = fabric2:get_db_info(Db), DeltaT = timer:now_diff(os:timestamp(), T0) / 1000, couch_stats:update_histogram([couchdb, dbinfo], DeltaT), send_json(Req, {DbInfo}); @@ -402,7 +399,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> "ok" -> % async_batching spawn(fun() -> - case catch(fabric:update_doc(Db, Doc2, Options)) of + case catch(fabric2:update_doc(Db, Doc2, Options)) of {ok, _} -> chttpd_stats:incr_writes(), ok; @@ -422,7 +419,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> % normal DocUrl = absolute_uri(Req, [$/, couch_util:url_encode(DbName), $/, couch_util:url_encode(DocId)]), - case fabric:update_doc(Db, Doc2, Options) of + case fabric2:update_doc(Db, Doc2, Options) of {ok, NewRev} -> chttpd_stats:incr_writes(), HttpCode = 201; @@ -497,7 +494,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req, true -> [all_or_nothing|Options]; _ -> Options end, - case fabric:update_docs(Db, Docs, Options2) of + case fabric2:update_docs(Db, Docs, Options2) of {ok, Results} -> % output the results chttpd_stats:incr_writes(length(Results)), @@ -516,7 +513,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req, send_json(Req, 417, ErrorsJson) end; false -> - case fabric:update_docs(Db, Docs, [replicated_changes|Options]) of + case fabric2:update_docs(Db, Docs, [replicated_changes|Options]) of {ok, Errors} -> chttpd_stats:incr_writes(length(Docs)), ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors), @@ -960,7 +957,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) -> NewDoc = Doc#doc{ atts = UpdatedAtts ++ OldAtts2 }, - case fabric:update_doc(Db, NewDoc, Options) of + case fabric2:update_doc(Db, NewDoc, Options) of {ok, NewRev} -> chttpd_stats:incr_writes(), HttpCode = 201; @@ -978,11 +975,10 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> #doc_query_args{ update_type = UpdateType } = parse_doc_query(Req), - DbName = couch_db:name(Db), - couch_db:validate_docid(Db, DocId), + DbName = fabric2_db:name(Db), + couch_doc:validate_docid(DocId), - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - Options = [{user_ctx,Ctx}, {w,W}], + Options = [{user_ctx, Ctx}], Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName), $/, couch_util:url_encode(DocId)]), @@ -1010,7 +1006,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)), spawn(fun() -> - case catch(fabric:update_doc(Db, Doc, Options)) of + case catch(fabric2:update_doc(Db, Doc, Options)) of {ok, _} -> chttpd_stats:incr_writes(), ok; @@ -1044,7 +1040,7 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) -> % open old doc Doc = couch_doc_open(Db, SourceDocId, SourceRev, []), % save new doc - case fabric:update_doc(Db, + case fabric2:update_doc(Db, Doc#doc{id=TargetDocId, revs=TargetRevs}, [{user_ctx,Ctx}]) of {ok, NewTargetRev} -> chttpd_stats:incr_writes(), @@ -1241,15 +1237,14 @@ send_updated_doc(Req, Db, DocId, Doc, Headers) -> send_updated_doc(#httpd{user_ctx=Ctx} = Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers, UpdateType) -> - W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of "true" -> - [full_commit, UpdateType, {user_ctx,Ctx}, {w,W}]; + [full_commit, UpdateType, {user_ctx,Ctx}]; "false" -> - [delay_commit, UpdateType, {user_ctx,Ctx}, {w,W}]; + [delay_commit, UpdateType, {user_ctx,Ctx}]; _ -> - [UpdateType, {user_ctx,Ctx}, {w,W}] + [UpdateType, {user_ctx,Ctx}] end, {Status, {etag, Etag}, Body} = update_doc(Db, DocId, #doc{deleted=Deleted}=Doc, Options), @@ -1269,7 +1264,7 @@ http_code_from_status(Status) -> update_doc(Db, DocId, #doc{deleted=Deleted, body=DocBody}=Doc, Options) -> {_, Ref} = spawn_monitor(fun() -> - try fabric:update_doc(Db, Doc, Options) of + try fabric2:update_doc(Db, Doc, Options) of Resp -> exit({exit_ok, Resp}) catch @@ -1339,7 +1334,7 @@ couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs} = Doc) -> end, Doc#doc{id=DocId, revs=Revs2}; couch_doc_from_req(Req, Db, DocId, Json) -> - Doc = couch_db:doc_from_json_obj_validate(Db, Json), + Doc = couch_doc:from_json_obj_validate(Json, fabric2_db:name(Db)), couch_doc_from_req(Req, Db, DocId, Doc). @@ -1551,7 +1546,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa atts = NewAtt ++ [A || A <- Atts, couch_att:fetch(name, A) /= FileName] }, W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), - case fabric:update_doc(Db, DocEdited, [{user_ctx,Ctx}, {w,W}]) of + case fabric2:update_doc(Db, DocEdited, [{user_ctx,Ctx}, {w,W}]) of {ok, UpdatedRev} -> chttpd_stats:incr_writes(), HttpCode = 201; diff --git a/src/fabric/src/fabric2.erl b/src/fabric/src/fabric2.erl index 924fb8979..59b27d543 100644 --- a/src/fabric/src/fabric2.erl +++ b/src/fabric/src/fabric2.erl @@ -24,6 +24,9 @@ delete_db/1, delete_db/2, + open_db/1, + open_db/2, + get_db_info/1, get_doc_count/1, get_doc_count/2, @@ -32,7 +35,7 @@ %% set_revs_limit/3, get_security/1, - set_security/2 + set_security/2, %% get_purge_infos_limit/1, %% set_purge_infos_limit/3, @@ -51,9 +54,10 @@ %% %% get_missing_revs/2, %% get_missing_revs/3, - %% - %% update_doc/3, - %% update_docs/3, + + update_doc/3, + update_docs/3 + %% purge_docs/3, %% %% att_receiver/2, @@ -100,7 +104,7 @@ create_db(DbName, _Options) -> fabric_server:transactional(fun(Tx) -> try DbDir = erlfdb_directory:create(Tx, DbsDir, DbName), - init_db(Tx, DbDir) + fabric2_db:init(DbName, Tx, DbDir) catch error:{erlfdb_directory, {create_error, path_exists, _}} -> {error, file_exists} end @@ -122,60 +126,76 @@ delete_db(DbName, _Options) -> end. -get_db_info(DbName) -> - [DbDir, MetaRows, LastChangeRow] = fabric_server:transactional(fun(Tx) -> - DbDir = open_db(Tx, DbName), - Meta = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>}), - MetaFuture = erlfdb:get_range_startswith(Tx, Meta), - {CStart, CEnd} = erlfdb_directory:range(DbDir, {<<"changes">>}), - ChangesFuture = erlfdb:get_range(Tx, CStart, CEnd, [ - {streaming_mode, exact}, - {limit, 1}, - {reverse, true} - ]), - [DbDir] ++ erlfdb:wait_for_all([MetaFuture, ChangesFuture]) +open_db(DbName) -> + open_db(DbName, []). + + +open_db(DbName, Options) -> + fabric2_db:open(DbName, Options). + + +get_db_info(DbName) when is_binary(DbName) -> + get_db_info(open_db(DbName)); + +get_db_info(Db) -> + DbProps = fabric2_db:with_tx(Db, fun(TxDb) -> + {CStart, CEnd} = fabric2_db:range_bounds(TxDb, {<<"changes">>}), + ChangesFuture = fabric2_db:get_range(TxDb, {CStart}, {CEnd}, [ + {streaming_mode, exact}, + {limit, 1}, + {reverse, true} + ]), + + StatsPrefix = {<<"meta">>, <<"stats">>}, + MetaFuture = fabric2_db:get_range_startswith(TxDb, StatsPrefix), + + RawSeq = case erlfdb:wait(ChangesFuture) of + [] -> + <<0:80>>; + [{SeqKey, _}] -> + {<<"changes">>, SeqBin} = fabric2_db:unpack(TxDb, SeqKey), + SeqBin + end, + CProp = {update_seq, ?l2b(couch_util:to_hex(RawSeq))}, + + MProps = lists:flatmap(fun({K, V}) -> + case fabric2_db:unpack(TxDb, K) of + {_, _, <<"doc_count">>} -> + [{doc_count, ?bin2uint(V)}]; + {_, _, <<"doc_del_count">>} -> + [{doc_del_count, ?bin2uint(V)}]; + {_, _, <<"size">>} -> + Val = ?bin2uint(V), + [ + {other, {[{data_size, Val}]}}, + {sizes, {[ + {active, 0}, + {external, Val}, + {file, 0} + ]}} + ]; + _ -> + [] + end + end, erlfdb:wait(MetaFuture)), + + [CProp | MProps] end), + BaseProps = [ {cluster, {[{n, 0}, {q, 0}, {r, 0}, {w, 0}]}}, {compact_running, false}, {data_size, 0}, - {db_name, DbName}, + {db_name, fabric2_db:name(Db)}, {disk_format_version, 0}, {disk_size, 0}, {instance_start_time, <<"0">>}, {purge_seq, 0} ], - WithMeta = lists:foldl(fun({KBin, VBin}, PropAcc) -> - case erlfdb_directory:unpack(DbDir, KBin) of - {_, _, <<"doc_count">>} -> - Val = ?bin2uint(VBin), - lists:keystore(doc_count, 1, PropAcc, {doc_count, Val}); - {_, _, <<"doc_del_count">>} -> - Val = ?bin2uint(VBin), - lists:keystore(doc_del_count, 1, PropAcc, {doc_del_count, Val}); - {_, _, <<"size">>} -> - Val = ?bin2uint(VBin), - Other = {other, {[{data_size, Val}]}}, - Sizes = {sizes, {[ - {active, 0}, - {external, Val}, - {file, 0} - ]}}, - PA1 = lists:keystore(other, 1, PropAcc, Other), - lists:keystore(sizes, 1, PA1, Sizes); - _ -> - PropAcc - end - end, BaseProps, MetaRows), - RawSeq = case LastChangeRow of - [] -> - <<0:80>>; - [{KBin, _}] -> - {<<"changes">>, SeqBin} = erlfdb_directory:unpack(DbDir, KBin), - SeqBin - end, - Seq = ?l2b(couch_util:to_hex(RawSeq)), - {ok, lists:keystore(update_seq, 1, WithMeta, {update_seq, Seq})}. + + {ok, lists:foldl(fun({Key, Val}, Acc) -> + lists:keystore(Key, 1, Acc, {Key, Val}) + end, BaseProps, DbProps)}. get_doc_count(DbName) -> @@ -191,57 +211,96 @@ get_doc_count(DbName, <<"_design">>) -> get_doc_count(DbName, <<"_local">>) -> get_doc_count(DbName, <<"doc_local_count">>); -get_doc_count(DbName, Key) -> - fabric_server:transactional(fun(Tx) -> - DbDir = open_db(Tx, DbName), - Key = erlfdb_directory:pack(DbDir, {<<"meta">>, <<"stats">>, Key}), - VBin = erlfdb:wait(erlfdb:get(Tx, Key)), - ?bin2uint(VBin) +get_doc_count(Db, Key) -> + fabric2_db:with_tx(Db, fun(TxDb) -> + Future = fabric2_db:get(TxDb, {<<"meta">>, <<"stats">>, Key}), + ?bin2uint(erlfdb:wait(Future)) end). -get_security(DbName) -> - SecJson = fabric_server:transactional(fun(Tx) -> - DbDir = open_db(Tx, DbName), - Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>}, - Key = erlfdb_directory:pack(DbDir, Tuple), - erlfdb:wait(erlfdb:get(Tx, Key)) +get_security(DbName) when is_binary(DbName) -> + get_security(open_db(DbName)); + +get_security(Db) -> + Key = {<<"meta">>, <<"config">>, <<"security_doc">>}, + SecJson = fabric2_db:with_tx(Db, fun(TxDb) -> + erlfdb:wait(fabric2_db:get(TxDb, Key)) end), ?JSON_DECODE(SecJson). -set_security(DbName, ErlJson) -> +set_security(DbName, ErlJson) when is_binary(DbName) -> + set_security(open_db(DbName), ErlJson); + +set_security(Db, ErlJson) -> + Key = {<<"meta">>, <<"config">>, <<"security_doc">>}, SecJson = ?JSON_ENCODE(ErlJson), - fabric_server:transactional(fun(Tx) -> - DbDir = open_db(Tx, DbName), - Tuple = {<<"meta">>, <<"config">>, <<"security_doc">>}, - Key = erlfdb_directory:pack(DbDir, Tuple), - erlfdb:set(Tx, Key, SecJson) + fabric2_db:with_tx(Db, fun(TxDb) -> + fabric2_db:set(TxDb, Key, SecJson) end). -init_db(Tx, DbDir) -> - Defaults = [ - {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)}, - {{<<"meta">>, <<"config">>, <<"security_doc">>}, ?DEFAULT_SECURITY}, - {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)}, - {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)} - ], - lists:foreach(fun({K, V}) -> - BinKey = erlfdb_directory:pack(DbDir, K), - erlfdb:set(Tx, BinKey, V) - end, Defaults). +update_doc(Db, Doc, Options) -> + fabric2_db:with_tx(Db, fun(TxDb) -> + case fabric2_doc:update(TxDb, Doc, opts(Options)) of + {ok, []} -> + % replication no-op + #doc{revs = {Pos, [RevId | _]}} = doc(Db, Doc), + {ok, {Pos, RevId}}; + {ok, NewRev} -> + {ok, NewRev}; + {error, Error} -> + throw(Error) + end + end). -open_db(Tx, DbName) -> - % We'll eventually want to cache this in the - % fabric_server ets table. - DbsDir = fabric_server:get_dir(dbs), - try - erlfdb_directory:open(Tx, DbsDir, DbName) - catch error:{erlfdb_directory, {open_error, path_missing, _}} -> - erlang:error(database_does_not_exist) +update_docs(DbName, Docs, Options) when is_binary(DbName) -> + update_docs(open_db(DbName, Options), Docs, Options); + +update_docs(Db, Docs, Options) -> + fabric2_db:with_tx(Db, fun(TxDb) -> + {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) -> + case fabric2_doc:update(TxDb, Doc, opts(Options)) of + {ok, _} = Resp -> + {Resp, Acc}; + {error, _} = Resp -> + {Resp, error} + end + end, ok, Docs), + {Status, Resps} + end). + + +docs(Db, Docs) -> + lists:map(fun(Doc) -> doc(Db, Doc) end, Docs). + + +doc(_Db, #doc{} = Doc) -> + Doc; + +doc(Db, {_} = Doc) -> + couch_db:doc_from_json_obj_validate(Db, Doc); + +doc(_Db, Doc) -> + erlang:error({illegal_doc_format, Doc}). + + +opts(Options) -> + lists:foldl(fun(Opt, Acc) -> + add_option(Opt, Acc) + end, Options, [user_ctx, io_priority]). + + +add_option(Key, Options) -> + case couch_util:get_value(Key, Options) of + undefined -> + case erlang:get(Key) of + undefined -> + Options; + Value -> + [{Key, Value} | Options] + end; + _ -> + Options end. diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl new file mode 100644 index 000000000..59b56a67a --- /dev/null +++ b/src/fabric/src/fabric2_db.erl @@ -0,0 +1,162 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_db). + + +-export([ + open/2, + init/3, + + name/1, + + with_tx/2, + + pack/2, + unpack/2, + range_bounds/2, + + get/2, + get_range/3, + get_range/4, + get_range_startswith/2, + get_range_startswith/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/include/fabric.hrl"). + + +open(DbName, Options) when is_binary(DbName), is_list(Options) -> + BaseDb = #{ + name => DbName, + tx => undefined, + dir => undefined, + user_ctx => #user_ctx{}, + validate_doc_update => undefined + }, + lists:foldl(fun({K, V}, DbAcc) -> + maps:put(K, V, DbAcc) + end, BaseDb, Options). + + +name(#{name := Name}) -> + Name. + + +init(DbName, Tx, DbDir) -> + TxDb = open(DbName, [{tx, Tx}, {dir, DbDir}]), + Defaults = [ + {{<<"meta">>, <<"config">>, <<"revs_limit">>}, ?uint2bin(1000)}, + {{<<"meta">>, <<"config">>, <<"security_doc">>}, <<"{}">>}, + {{<<"meta">>, <<"stats">>, <<"doc_count">>}, ?uint2bin(0)}, + {{<<"meta">>, <<"stats">>, <<"doc_del_count">>}, ?uint2bin(0)}, + {{<<"meta">>, <<"stats">>, <<"doc_design_count">>}, ?uint2bin(0)}, + {{<<"meta">>, <<"stats">>, <<"doc_local_count">>}, ?uint2bin(0)}, + {{<<"meta">>, <<"stats">>, <<"size">>}, ?uint2bin(2)} + ], + lists:foreach(fun({K, V}) -> + erlfdb:set(Tx, pack(TxDb, K), V) + end, Defaults). + + +with_tx(Db, Fun) when is_function(Fun, 1) -> + DbName = maps:get(name, Db), + DbsDir = fabric_server:get_dir(dbs), + fabric_server:transactional(fun(Tx) -> + % We'll eventually want to cache this in the + % fabric_server ets table. + DbDir = try + erlfdb_directory:open(Tx, DbsDir, DbName) + catch error:{erlfdb_directory, {open_error, path_missing, _}} -> + erlang:error(database_does_not_exist) + end, + TxDb = Db#{tx := Tx, dir := DbDir}, + Fun(TxDb) + end). + + +pack(#{dir := undefined} = Db, _Tuple) -> + erlang:error({no_directory, Db}); + +pack(Db, Tuple) -> + #{ + dir := Dir + } = Db, + erlfdb_directory:pack(Dir, Tuple). + + +unpack(#{dir := undefined} = Db, _Key) -> + erlang:error({no_directory, Db}); + +unpack(Db, Key) -> + #{ + dir := Dir + } = Db, + erlfdb_directory:unpack(Dir, Key). + + +range_bounds(#{dir := undefined} = Db, _Key) -> + erlang:error({no_directory, Db}); + +range_bounds(Db, Key) -> + #{ + dir := Dir + } = Db, + erlfdb_directory:range(Dir, Key). + + + +get(#{tx := undefined} = Db, _Key) -> + erlang:error({invalid_tx_db, Db}); + +get(Db, Key) -> + #{ + tx := Tx, + dir := Dir + } = Db, + BinKey = erlfdb_directory:pack(Dir, Key), + erlfdb:get(Tx, BinKey). + + +get_range(Db, StartKey, EndKey) -> + get_range(Db, StartKey, EndKey, []). + + +get_range(#{tx := undefined} = Db, _, _, _) -> + erlang:error({invalid_tx_db, Db}); + +get_range(Db, StartKey, EndKey, Options) -> + #{ + tx := Tx, + dir := Dir + } = Db, + BinStartKey = erlfdb_directory:pack(Dir, StartKey), + BinEndKey= erlfdb_directory:pack(Dir, EndKey), + erlfdb:get_range(Tx, BinStartKey, BinEndKey, Options). + + +get_range_startswith(Db, Prefix) -> + get_range_startswith(Db, Prefix, []). + + +get_range_startswith(#{tx := undefined} = Db, _, _) -> + erlang:error({invalid_tx_db, Db}); + +get_range_startswith(Db, Prefix, Options) -> + #{ + tx := Tx, + dir := Dir + } = Db, + BinPrefix = erlfdb_directory:pack(Dir, Prefix), + erlfdb:get_range_startswith(Tx, BinPrefix, Options). diff --git a/src/fabric/src/fabric2_doc.erl b/src/fabric/src/fabric2_doc.erl new file mode 100644 index 000000000..042e8a5b9 --- /dev/null +++ b/src/fabric/src/fabric2_doc.erl @@ -0,0 +1,447 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_doc). + + +-export([ + get_fdi/2, + open/3, + update/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +-define(RETURN(Term), throw({?MODULE, Term})). + + +get_fdi(TxDb, DocId) -> + Future = fabric2_db:get(TxDb, {<<"revs">>, DocId}), + fdb_to_fdi(DocId, erlfdb:wait(Future)). + + +open(TxDb, DocId, {Pos, [Rev | _] = Path}) -> + Key = {<<"docs">>, DocId, Pos, Rev}, + Future = fabric2_db:get(TxDb, Key), + fdb_to_doc(DocId, Pos, Path, erlfdb:wait(Future)). + + +% TODO: Handle _local docs separately. +update(TxDb, #doc{} = Doc0, Options) -> + UpdateType = case lists:member(replicated_changes, Options) of + true -> replicated_changes; + false -> interactive_edit + end, + + try + FDI1 = get_fdi(TxDb, Doc0#doc.id), + Doc1 = prep_and_validate(TxDb, FDI1, Doc0, UpdateType), + Doc2 = case UpdateType of + interactive_edit -> new_revid(Doc1); + replicated_changes -> Doc1 + end, + {FDI2, Doc3} = merge_rev_tree(FDI1, Doc2, UpdateType), + + #{tx := Tx} = TxDb, + + % Delete old entry in changes feed + OldSeqKey = {<<"changes">>, FDI1#full_doc_info.update_seq}, + OldSeqKeyBin = fabric2_db:pack(TxDb, OldSeqKey), + erlfdb:clear(Tx, OldSeqKeyBin), + + % Add new entry to changes feed + NewSeqKey = {<<"changes">>, <<16#FFFFFFFFFFFFFFFFFFFF:80>>}, + NewSeqKeyBin = fabric2_db:pack_vs(TxDb, NewSeqKey), + erlfdb:set_versionstamped_key(Tx, NewSeqKeyBin, Doc3#doc.id), + + {RevStart, [Rev | _]} = Doc3#doc.revs, + + % Write document data + {NewDocKey, NewDocVal} = doc_to_fdb(TxDb, Doc3), + erlfdb:set(Tx, NewDocKey, NewDocVal), + + % Update revision tree entry + {NewFDIKey, NewFDIVal} = fdi_to_fdb(TxDb, FDI2), + erlfdb:set_versionstampled_value(Tx, NewFDIKey, NewFDIVal), + + {DocIncr, DocDelIncr} = case {FDI1, FDI2} of + {not_found, _} -> + {1, 0}; + {#full_doc_info{deleted = true}, #full_doc_info{deleted = false}} -> + {1, -1}; + {#full_doc_info{deleted = false}, #full_doc_info{deleted = true}} -> + {-1, 1}; + _ -> + {0, 0} + end, + + DocCountKey = {<<"meta">>, <<"stats">>, <<"doc_count">>}, + DocCountKeyBin = fabric2_db:pack(TxDb, DocCountKey), + DocDelCountKey = {<<"meta">>, <<"stats">>, <<"doc_del_count">>}, + DocDelCountKeyBin = fabric2_db:pack(TxDb, DocDelCountKey), + + if DocIncr == 0 -> ok; true -> + erlfdb:add(Tx, DocCountKeyBin, DocIncr) + end, + + if DocDelIncr == 0 -> ok; true -> + erlfdb:add(Tx, DocDelCountKeyBin, DocDelIncr) + end, + + % TODO: Count design docs separately + + % And done. + {ok, {RevStart, Rev}} + catch throw:{?MODULE, Return} -> + Return + end. + + +prep_and_validate(TxDb, not_found, Doc, UpdateType) -> + case Doc#doc.revs of + {0, []} when UpdateType == interactive_edit -> + ?RETURN({error, conflict}); + _ -> + ok + end, + prep_and_validate(TxDb, Doc, fun() -> nil end); + +prep_and_validate(TxDb, FDI, Doc, interactive_edit) -> + #doc{ + revs = {Start, Revs} + } = Doc, + + Leafs = couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree), + LeafRevs = lists:map(fun({_Leaf, {LeafStart, [LeafRev | _] = Path}}) -> + {{LeafStart, LeafRev}, Path} + end, Leafs), + + GetDocFun = case Revs of + [PrevRev | _] -> + case lists:keyfind({Start, PrevRev}, 1, LeafRevs) of + {{Start, PrevRev}, Path} -> + fun() -> open(TxDb, Doc#doc.id, {Start, Path}) end; + false -> + ?RETURN({error, conflict}) + end; + [] -> + case FDI#full_doc_info.deleted of + true -> + fun() -> nil end; + false -> + ?RETURN({error, conflict}) + end + end, + prep_and_validate(TxDb, Doc, GetDocFun); + +prep_and_validate(TxDb, FDI, Doc, replicated_changes) -> + #full_doc_info{ + rev_tree = RevTree + } = FDI, + OldLeafs = couch_key_tree:get_all_leafs_full(RevTree), + OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs], + + NewPath = couch_doc:to_path(Doc), + NewRevTree = couch_key_tree:merge(RevTree, NewPath), + + Leafs = couch_key_tree:get_all_leafs_full(NewRevTree), + LeafRevsFull = lists:map(fun({Start, [{RevId, _} | _]} = FullPath) -> + [{{Start, RevId}, FullPath}] + end, Leafs), + LeafRevsFullDict = dict:from_list(LeafRevsFull), + + #doc{ + revs = {DocStart, [DocRev | _]} + } = Doc, + DocRevId = {DocStart, DocRev}, + + IsOldLeaf = lists:member(DocRevId, OldLeafsLU), + GetDocFun = case dict:find(DocRevId, LeafRevsFullDict) of + {ok, {DocStart, RevPath}} when not IsOldLeaf -> + % An incoming replicated edit only sends us leaf + % nodes which may have included multiple updates + % we haven't seen locally. Thus we have to search + % back through the tree to find the first edit + % we do know about. + case find_prev_known_rev(DocStart, RevPath) of + not_found -> fun() -> nil end; + PrevRevs -> fun() -> open(TxDb, Doc#doc.id, PrevRevs) end + end; + _ -> + % The update merged to an internal node that we + % already know about which means we're done with + % this update. + ?RETURN({ok, []}) + end, + + prep_and_validate(TxDb, Doc, GetDocFun). + + +prep_and_validate(TxDb, Doc, GetDocBody) -> + NewDoc = case couch_doc:has_stubs(Doc) of + true -> + case GetDocBody() of + #doc{} = PrevDoc -> + couch_doc:merge_stubs(Doc, PrevDoc); + _ -> + % Force a missing stubs error + couch_doc:mege_stubs(Doc, #doc{}) + end; + false -> + Doc + end, + validate_doc_update(TxDb, NewDoc, GetDocBody), + NewDoc. + + +merge_rev_tree(FDI, Doc, interactive_edit) when FDI#full_doc_info.deleted -> + % We're recreating a document that was previously + % deleted. To check that this is a recreation from + % the root we assert that the new document has a + % revision depth of 1 (this is to avoid recreating a + % doc from a previous internal revision) and is also + % not deleted. To avoid expanding the revision tree + % unnecessarily we create a new revision based on + % the winning deleted revision. + + {RevDepth, _} = Doc#doc.revs, + case RevDepth == 1 andalso not Doc#doc.deleted of + true -> + % Update the new doc based on revisions in OldInfo + #doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(FDI), + #rev_info{rev={OldPos, OldRev}} = WinningRev, + Body = case couch_util:get_value(comp_body, Doc#doc.meta) of + CompBody when is_binary(CompBody) -> + couch_compress:decompress(CompBody); + _ -> + Doc#doc.body + end, + NewDoc = new_revid(Doc#doc{ + revs = {OldPos, [OldRev]}, + body = Body + }), + + % Merge our modified new doc into the tree + #full_doc_info{rev_tree = RevTree} = FDI, + case couch_key_tree:merge(RevTree, couch_doc:to_path(NewDoc)) of + {NewRevTree, new_leaf} -> + % We changed the revision id so inform the caller + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = false + }, + {NewFDI, NewDoc}; + _ -> + throw(doc_recreation_failed) + end; + _ -> + ?RETURN({error, conflict}) + end; +merge_rev_tree(FDI, Doc, interactive_edit) -> + % We're attempting to merge a new revision into an + % undeleted document. To not be a conflict we require + % that the merge results in extending a branch. + + RevTree = FDI#full_doc_info.rev_tree, + case couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)) of + {NewRevTree, new_leaf} when not Doc#doc.deleted -> + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = false + }, + {NewFDI, Doc}; + {NewRevTree, new_leaf} when Doc#doc.deleted -> + % We have to check if we just deleted this + % document completely or if it was a conflict + % resolution. + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = couch_doc:is_deleted(NewRevTree) + }, + {NewFDI, Doc}; + _ -> + ?RETURN({error, conflict}) + end; +merge_rev_tree(FDI, Doc, replicated_changes) -> + % We're merging in revisions without caring about + % conflicts. Most likely this is a replication update. + RevTree = FDI#full_doc_info.rev_tree, + {NewRevTree, _} = couch_key_tree:merge(RevTree, couch_doc:to_path(Doc)), + NewFDI = FDI#full_doc_info{ + rev_tree = NewRevTree, + deleted = couch_doc:is_deleted(NewRevTree) + }, + % If a replicated change did not modify the revision + % tree then we've got nothing else to do. + if NewFDI /= FDI -> ok; true -> + ?RETURN({ok, []}) + end, + {NewFDI, Doc}. + + + +validate_doc_update(TxDb, #doc{id = <<"_design/", _/binary>>} = Doc, _) -> + case catch fabric2_db:check_is_admin(TxDb) of + ok -> validate_ddoc(TxDb, Doc); + Error -> ?RETURN(Error) + end; +validate_doc_update(_TxDb, #doc{id = <<"_local/", _/binary>>}, _) -> + ok; +validate_doc_update(TxDb, Doc, GetDiskDocFun) -> + Fun = fun() -> + DiskDoc = GetDiskDocFun(), + JsonCtx = couch_util:json_user_ctx(TxDb), + SecObj = fabric2_db:get_security(TxDb), + try + lists:map(fun(VDU) -> + case VDU(Doc, DiskDoc, JsonCtx, SecObj) of + ok -> ok; + Error -> ?RETURN(Error) + end + end, fabric2_db:get_vdus(TxDb)), + ok + catch + throw:Error -> + Error + end + end, + Stat = [couchdb, query_server, vdu_process_time], + couch_stats:update_histogram(Stat, Fun). + + +validate_ddoc(TxDb, DDoc) -> + try + ok = couch_index_server:validate(TxDb, couch_doc:with_ejson_body(DDoc)) + catch + throw:{invalid_design_doc, Reason} -> + throw({bad_request, invalid_design_doc, Reason}); + throw:{compilation_error, Reason} -> + throw({bad_request, compilation_error, Reason}); + throw:Error -> + ?RETURN(Error) + end. + + +find_prev_known_rev(_Pos, []) -> + not_found; +find_prev_known_rev(Pos, [{_Rev, #doc{}} | RestPath]) -> + % doc records are skipped because these are the result + % of replication sending us an update. We're only interested + % in what we have locally since we're comparing attachment + % stubs. The replicator should never do this because it + % should only ever send leaves but the possibility exists + % so we account for it. + find_prev_known_rev(Pos - 1, RestPath); +find_prev_known_rev(Pos, [{_Rev, ?REV_MISSING} | RestPath]) -> + find_prev_known_rev(Pos - 1, RestPath); +find_prev_known_rev(Pos, [{_Rev, #leaf{}} | _] = DocPath) -> + {Pos, [Rev || {Rev, _Val} <- DocPath]}. + + +new_revid(Doc) -> + #doc{ + body = Body, + revs = {OldStart, OldRevs}, + atts = Atts, + deleted = Deleted + } = Doc, + + DigestedAtts = lists:foldl(fun(Att, Acc) -> + [N, T, M] = couch_att:fetch([name, type, md5], Att), + case M == <<>> of + true -> Acc; + false -> [{N, T, M} | Acc] + end + end, [], Atts), + + Rev = case DigestedAtts of + Atts2 when length(Atts) =/= length(Atts2) -> + % We must have old style non-md5 attachments + list_to_binary(integer_to_list(couch_util:rand32())); + Atts2 -> + OldRev = case OldRevs of [] -> 0; [OldRev0 | _] -> OldRev0 end, + SigTerm = [Deleted, OldStart, OldRev, Body, Atts2], + couch_hash:md5_hash(term_to_binary(SigTerm, [{minor_version, 1}])) + end, + + Doc#doc{revs = {OldStart + 1, [Rev | OldRevs]}}. + + +doc_to_fdb(TxDb, #doc{} = Doc) -> + #doc{ + id = Id, + revs = {Start, [Rev | _]}, + body = Body, + atts = Atts, + deleted = Deleted + } = Doc, + Key = {<<"docs">>, Id, Start, Rev}, + KeyBin = fabric2_db:pack(TxDb, Key), + Val = {Body, Atts, Deleted}, + {KeyBin, term_to_binary(Val, [{minor_version, 1}])}. + + +fdb_to_doc(DocId, Pos, Path, Bin) when is_binary(Bin) -> + {Body, Atts, Deleted} = binary_to_term(Bin, [safe]), + #doc{ + id = DocId, + revs = {Pos, Path}, + body = Body, + atts = Atts, + deleted = Deleted + }; +fdb_to_doc(_DocId, _Pos, _Path, not_found) -> + {not_found, missing}. + + +fdi_to_fdb(TxDb, #full_doc_info{} = FDI) -> + #full_doc_info{ + id = Id, + deleted = Deleted, + rev_tree = RevTree + } = flush_tree(FDI), + + Key = {<<"revs">>, Id}, + KeyBin = fabric2_db:pack(TxDb, Key), + Val = {Deleted, RevTree, <<16#FFFFFFFFFFFFFFFFFFFF:80>>}, + {KeyBin, term_to_binary(Val, [{minor_version, 1}])}. + + +fdb_to_fdi(Id, Bin) when is_binary(Bin) -> + {Deleted, RevTree, UpdateSeq} = binary_to_term(Bin, [safe]), + #full_doc_info{ + id = Id, + deleted = Deleted, + rev_tree = RevTree, + update_seq = UpdateSeq + }; +fdb_to_fdi(_Id, not_found) -> + not_found. + + +flush_tree(FDI) -> + #full_doc_info{ + rev_tree = Unflushed + } = FDI, + + Flushed = couch_key_tree:map(fun(_Rev, Value) -> + case Value of + #doc{deleted = Del} -> #leaf{deleted = Del}; + _ -> Value + end + end, Unflushed), + + FDI#full_doc_info{ + rev_tree = Flushed + }. |