diff options
-rw-r--r-- | src/fabric/include/fabric2.hrl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 16 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 126 | ||||
-rw-r--r-- | src/fabric/src/fabric2_util.erl | 16 | ||||
-rw-r--r-- | src/fabric/test/fabric2_doc_att_tests.erl | 285 |
5 files changed, 428 insertions, 20 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 189995de2..b4dd084a2 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -37,12 +37,15 @@ -define(DB_ATTS, 23). -define(DB_VIEWS, 24). -define(DB_LOCAL_DOC_BODIES, 25). +-define(DB_ATT_NAMES, 26). % Versions --define(CURR_REV_FORMAT, 0). +% 0 - Initial implementation +% 1 - Added attachment hash +-define(CURR_REV_FORMAT, 1). % Misc constants diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 88840e702..6d015df0e 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -1341,7 +1341,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> #doc{ deleted = NewDeleted, - revs = {NewRevPos, [NewRev | NewRevPath]} + revs = {NewRevPos, [NewRev | NewRevPath]}, + atts = Atts } = Doc4 = stem_revisions(Db, Doc3), NewRevInfo = #{ @@ -1350,7 +1351,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) -> rev_id => {NewRevPos, NewRev}, rev_path => NewRevPath, sequence => undefined, - branch_count => undefined + branch_count => undefined, + att_hash => fabric2_util:hash_atts(Atts) }, % Gather the list of possible winnig revisions @@ -1405,7 +1407,8 @@ update_doc_replicated(Db, Doc0, _Options) -> rev_id => {RevPos, Rev}, rev_path => RevPath, sequence => undefined, - branch_count => undefined + branch_count => undefined, + att_hash => <<>> }, AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId), @@ -1444,6 +1447,9 @@ update_doc_replicated(Db, Doc0, _Options) -> PrevRevInfo = find_prev_revinfo(RevPos, LeafPath), Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo), Doc3 = flush_doc_atts(Db, Doc2), + DocRevInfo2 = DocRevInfo1#{ + atts_hash => fabric2_util:hash_atts(Doc3#doc.atts) + }, % Possible winners are the previous winner and % the new DocRevInfo @@ -1453,9 +1459,9 @@ update_doc_replicated(Db, Doc0, _Options) -> end, {NewWinner0, NonWinner} = case Winner == PrevRevInfo of true -> - {DocRevInfo1, not_found}; + {DocRevInfo2, not_found}; false -> - [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo1]), + [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo2]), {W, NW} end, diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index fb2891be7..404460ed5 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -580,9 +580,40 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> #doc{ id = DocId, - deleted = Deleted + deleted = Deleted, + atts = Atts } = Doc, + % Doc body + + ok = write_doc_body(Db, Doc), + + % Attachment bookkeeping + + % If a document's attachments have changed we have to scan + % for any attachments that may need to be deleted. The check + % for `>= 2` is a bit subtle. The important point is that + % one of the revisions will be from the new document so we + % have to find at least one more beyond that to assert that + % the attachments have not changed. + AttHash = fabric2_util:hash_atts(Atts), + RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove, + AttHashCount = lists:foldl(fun(Att, Count) -> + #{att_hash := RevAttHash} = Att, + case RevAttHash == AttHash of + true -> Count + 1; + false -> Count + end + end, 0, RevsToCheck), + if + AttHashCount == length(RevsToCheck) -> + ok; + AttHashCount >= 2 -> + ok; + true -> + cleanup_attachments(Db, DocId, Doc, ToRemove) + end, + % Revision tree NewWinner = NewWinner0#{winner := true}, @@ -649,8 +680,6 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> % And all the rest... - ok = write_doc_body(Db, Doc), - IsDDoc = case Doc#doc.id of <<?DESIGN_DOC_PREFIX, _/binary>> -> true; _ -> false @@ -755,6 +784,9 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) -> AttId = fabric2_util:uuid(), Chunks = chunkify_binary(Data), + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + ok = erlfdb:set(Tx, IdKey, <<>>), + lists:foldl(fun(Chunk, ChunkId) -> AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix), ok = erlfdb:set(Tx, AttKey, Chunk), @@ -1014,16 +1046,71 @@ clear_doc_body(#{} = Db, DocId, #{} = RevInfo) -> ok = erlfdb:clear_range(Tx, StartKey, EndKey). +cleanup_attachments(Db, DocId, NewDoc, ToRemove) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = Db, + + RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove), + + % Gather all known document revisions + {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []), + AllDocs = [{ok, NewDoc} | DiskDocs], + + % Get referenced attachment ids + ActiveIdSet = lists:foldl(fun({ok, Doc}, Acc) -> + #doc{ + revs = {Pos, [Rev | _]} + } = Doc, + case lists:member({Pos, Rev}, RemoveRevs) of + true -> + Acc; + false -> + lists:foldl(fun(Att, InnerAcc) -> + {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att), + sets:add_element(AttId, InnerAcc) + end, Acc, Doc#doc.atts) + end + end, sets:new(), AllDocs), + + AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix), + Options = [{streaming_mode, want_all}], + Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options), + + ExistingIdSet = lists:foldl(fun({K, _}, Acc) -> + {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix), + sets:add_element(AttId, Acc) + end, sets:new(), erlfdb:wait(Future)), + + AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet), + + lists:foreach(fun(AttId) -> + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + erlfdb:clear(Tx, IdKey), + + ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + erlfdb:clear_range_startswith(Tx, ChunkKey) + end, sets:to_list(AttsToRemove)). + + revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) -> #{ deleted := Deleted, rev_id := {RevPos, Rev}, rev_path := RevPath, - branch_count := BranchCount + branch_count := BranchCount, + att_hash := AttHash } = RevId, VS = new_versionstamp(Tx), Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, - Val = {?CURR_REV_FORMAT, VS, BranchCount, list_to_tuple(RevPath)}, + Val = { + ?CURR_REV_FORMAT, + VS, + BranchCount, + list_to_tuple(RevPath), + AttHash + }, KBin = erlfdb_tuple:pack(Key, DbPrefix), VBin = erlfdb_tuple:pack_vs(Val), {KBin, VBin, VS}; @@ -1032,38 +1119,49 @@ revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) -> #{ deleted := Deleted, rev_id := {RevPos, Rev}, - rev_path := RevPath + rev_path := RevPath, + att_hash := AttHash } = RevId, Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev}, - Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath)}, + Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash}, KBin = erlfdb_tuple:pack(Key, DbPrefix), VBin = erlfdb_tuple:pack(Val), {KBin, VBin, undefined}. -fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) -> +fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _} = Val) -> {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, - {_RevFormat, Sequence, BranchCount, RevPath} = Val, + {_RevFormat, Sequence, BranchCount, RevPath, AttHash} = Val, #{ winner => true, deleted => not NotDeleted, rev_id => {RevPos, Rev}, rev_path => tuple_to_list(RevPath), sequence => Sequence, - branch_count => BranchCount + branch_count => BranchCount, + att_hash => AttHash }; -fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _} = Val) -> +fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _} = Val) -> {?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key, - {_RevFormat, RevPath} = Val, + {_RevFormat, RevPath, AttHash} = Val, #{ winner => false, deleted => not NotDeleted, rev_id => {RevPos, Rev}, rev_path => tuple_to_list(RevPath), sequence => undefined, - branch_count => undefined - }. + branch_count => undefined, + att_hash => AttHash + }; + +fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) -> + Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, <<>>}, + fdb_to_revinfo(Key, Val); + +fdb_to_revinfo(Key, {0, RPath}) -> + Val = {?CURR_REV_FORMAT, RPath, <<>>}, + fdb_to_revinfo(Key, Val). doc_to_fdb(Db, #doc{} = Doc) -> diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl index 2b8e49ebf..e3f94f546 100644 --- a/src/fabric/src/fabric2_util.erl +++ b/src/fabric/src/fabric2_util.erl @@ -25,6 +25,8 @@ validate_security_object/1, + hash_atts/1, + dbname_ends_with/2, get_value/2, @@ -124,6 +126,20 @@ validate_json_list_of_strings(Member, Props) -> end. +hash_atts([]) -> + <<>>; + +hash_atts(Atts) -> + lists:sort(fun(A, B) -> + couch_att:fetch(name, A) =< couch_att:fetch(name, B) + end, Atts), + Md5St = lists:foldl(fun(Att, Acc) -> + {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att), + couch_hash:md5_hash_update(Acc, AttId) + end, couch_hash:md5_hash_init(), Atts), + couch_hash:md5_hash_final(Md5St). + + dbname_ends_with(#{} = Db, Suffix) -> dbname_ends_with(fabric2_db:name(Db), Suffix); diff --git a/src/fabric/test/fabric2_doc_att_tests.erl b/src/fabric/test/fabric2_doc_att_tests.erl new file mode 100644 index 000000000..331e1a4e8 --- /dev/null +++ b/src/fabric/test/fabric2_doc_att_tests.erl @@ -0,0 +1,285 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_doc_att_tests). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("fabric2.hrl"). +-include("fabric2_test.hrl"). + + +doc_crud_test_() -> + { + "Test document CRUD operations", + { + setup, + fun setup/0, + fun cleanup/1, + with([ + ?TDEF(create_att), + ?TDEF(delete_att), + ?TDEF(multiple_atts), + ?TDEF(delete_one_att), + ?TDEF(large_att), + ?TDEF(att_on_conflict_isolation) + ]) + } + }. + + +setup() -> + Ctx = test_util:start_couch([fabric]), + {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]), + {Db, Ctx}. + + +cleanup({Db, Ctx}) -> + ok = fabric2_db:delete(fabric2_db:name(Db), []), + test_util:stop_couch(Ctx). + + +create_att({Db, _}) -> + DocId = fabric2_util:uuid(), + Att1 = couch_att:new([ + {name, <<"foo.txt">>}, + {type, <<"application/octet-stream">>}, + {att_len, 6}, + {data, <<"foobar">>}, + {encoding, identity}, + {md5, <<>>} + ]), + Doc1 = #doc{ + id = DocId, + atts = [Att1] + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1), + {ok, Doc2} = fabric2_db:open_doc(Db, DocId), + #doc{ + atts = [Att2] + } = Doc2, + {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2), + AttData = fabric2_db:read_attachment(Db, DocId, AttId), + ?assertEqual(<<"foobar">>, AttData), + + % Check that the raw keys exist + #{ + db_prefix := DbPrefix + } = Db, + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + + fabric2_fdb:transactional(fun(Tx) -> + IdVal = erlfdb:wait(erlfdb:get(Tx, IdKey)), + AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)), + + ?assertEqual(<<>>, IdVal), + ?assertMatch([{_, <<"foobar">>}], AttVals) + end). + + +delete_att({Db, _}) -> + DocId = fabric2_util:uuid(), + Att1 = couch_att:new([ + {name, <<"foo.txt">>}, + {type, <<"application/octet-stream">>}, + {att_len, 6}, + {data, <<"foobar">>}, + {encoding, identity}, + {md5, <<>>} + ]), + Doc1 = #doc{ + id = DocId, + atts = [Att1] + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1), + {ok, Doc2} = fabric2_db:open_doc(Db, DocId), + #doc{ + atts = [Att2] + } = Doc2, + {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2), + + Doc3 = Doc2#doc{atts = []}, + {ok, _} = fabric2_db:update_doc(Db, Doc3), + + {ok, Doc4} = fabric2_db:open_doc(Db, DocId), + ?assertEqual([], Doc4#doc.atts), + + % Check that the raw keys were removed + #{ + db_prefix := DbPrefix + } = Db, + IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix), + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + + fabric2_fdb:transactional(fun(Tx) -> + IdVal = erlfdb:wait(erlfdb:get(Tx, IdKey)), + AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)), + + ?assertEqual(not_found, IdVal), + ?assertMatch([], AttVals) + end). + + +multiple_atts({Db, _}) -> + DocId = fabric2_util:uuid(), + Atts = [ + mk_att(<<"foo.txt">>, <<"foobar">>), + mk_att(<<"bar.txt">>, <<"barfoo">>), + mk_att(<<"baz.png">>, <<"blargh">>) + ], + {ok, _} = create_doc(Db, DocId, Atts), + ?assertEqual( + #{ + <<"foo.txt">> => <<"foobar">>, + <<"bar.txt">> => <<"barfoo">>, + <<"baz.png">> => <<"blargh">> + }, + read_atts(Db, DocId) + ). + + +delete_one_att({Db, _}) -> + DocId = fabric2_util:uuid(), + Atts1 = [ + mk_att(<<"foo.txt">>, <<"foobar">>), + mk_att(<<"bar.txt">>, <<"barfoo">>), + mk_att(<<"baz.png">>, <<"blargh">>) + ], + {ok, RevId} = create_doc(Db, DocId, Atts1), + Atts2 = tl(Atts1), + {ok, _} = update_doc(Db, DocId, RevId, stubify(RevId, Atts2)), + ?assertEqual( + #{ + <<"bar.txt">> => <<"barfoo">>, + <<"baz.png">> => <<"blargh">> + }, + read_atts(Db, DocId) + ). + + +large_att({Db, _}) -> + DocId = fabric2_util:uuid(), + % Total size ~360,000 bytes + AttData = iolist_to_binary([ + <<"foobar">> || _ <- lists:seq(1, 60000) + ]), + Att1 = mk_att("long.txt", AttData), + {ok, _} = create_doc(Db, DocId, [Att1]), + ?assertEqual(#{"long.txt" => AttData}, read_atts(Db, DocId)), + + {ok, Doc} = fabric2_db:open_doc(Db, DocId), + #doc{atts = [Att2]} = Doc, + {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2), + + #{db_prefix := DbPrefix} = Db, + AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix), + fabric2_fdb:transactional(fun(Tx) -> + AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)), + ?assertEqual(4, length(AttVals)) + end). + + +att_on_conflict_isolation({Db, _}) -> + DocId = fabric2_util:uuid(), + [PosRevA1, PosRevB1] = create_conflicts(Db, DocId, []), + Att = mk_att(<<"happy_goat.tiff">>, <<":D>">>), + {ok, PosRevA2} = update_doc(Db, DocId, PosRevA1, [Att]), + ?assertEqual( + #{<<"happy_goat.tiff">> => <<":D>">>}, + read_atts(Db, DocId, PosRevA2) + ), + ?assertEqual(#{}, read_atts(Db, DocId, PosRevB1)). + + +mk_att(Name, Data) -> + couch_att:new([ + {name, Name}, + {type, <<"application/octet-stream">>}, + {att_len, size(Data)}, + {data, Data}, + {encoding, identity}, + {md5, <<>>} + ]). + + +stubify(RevId, Atts) when is_list(Atts) -> + lists:map(fun(Att) -> + stubify(RevId, Att) + end, Atts); + +stubify({Pos, _Rev}, Att) -> + couch_att:store([ + {data, stub}, + {revpos, Pos} + ], Att). + + +create_doc(Db, DocId, Atts) -> + Doc = #doc{ + id = DocId, + atts = Atts + }, + fabric2_db:update_doc(Db, Doc). + + +update_doc(Db, DocId, {Pos, Rev}, Atts) -> + Doc = #doc{ + id = DocId, + revs = {Pos, [Rev]}, + atts = Atts + }, + fabric2_db:update_doc(Db, Doc). + + +create_conflicts(Db, DocId, Atts) -> + Base = #doc{ + id = DocId, + atts = Atts + }, + {ok, {_, Rev1} = PosRev} = fabric2_db:update_doc(Db, Base), + <<Rev2:16/binary, Rev3:16/binary>> = fabric2_util:uuid(), + Doc1 = #doc{ + id = DocId, + revs = {2, [Rev2, Rev1]}, + atts = stubify(PosRev, Atts) + }, + Doc2 = #doc{ + id = DocId, + revs = {2, [Rev3, Rev1]}, + atts = stubify(PosRev, Atts) + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1, [replicated_changes]), + {ok, _} = fabric2_db:update_doc(Db, Doc2, [replicated_changes]), + lists:reverse(lists:sort([{2, Rev2}, {2, Rev3}])). + + +read_atts(Db, DocId) -> + {ok, #doc{atts = Atts}} = fabric2_db:open_doc(Db, DocId), + atts_to_map(Db, DocId, Atts). + + +read_atts(Db, DocId, PosRev) -> + {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, [PosRev], []), + [{ok, #doc{atts = Atts}}] = Docs, + atts_to_map(Db, DocId, Atts). + + +atts_to_map(Db, DocId, Atts) -> + lists:foldl(fun(Att, Acc) -> + [Name, Data] = couch_att:fetch([name, data], Att), + {loc, _Db, DocId, AttId} = Data, + AttBin = fabric2_db:read_attachment(Db, DocId, AttId), + maps:put(Name, AttBin, Acc) + end, #{}, Atts). |