summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fabric/include/fabric2.hrl5
-rw-r--r--src/fabric/src/fabric2_db.erl16
-rw-r--r--src/fabric/src/fabric2_fdb.erl126
-rw-r--r--src/fabric/src/fabric2_util.erl16
-rw-r--r--src/fabric/test/fabric2_doc_att_tests.erl285
5 files changed, 428 insertions, 20 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 189995de2..b4dd084a2 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -37,12 +37,15 @@
-define(DB_ATTS, 23).
-define(DB_VIEWS, 24).
-define(DB_LOCAL_DOC_BODIES, 25).
+-define(DB_ATT_NAMES, 26).
% Versions
--define(CURR_REV_FORMAT, 0).
+% 0 - Initial implementation
+% 1 - Added attachment hash
+-define(CURR_REV_FORMAT, 1).
% Misc constants
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 88840e702..6d015df0e 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -1341,7 +1341,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
#doc{
deleted = NewDeleted,
- revs = {NewRevPos, [NewRev | NewRevPath]}
+ revs = {NewRevPos, [NewRev | NewRevPath]},
+ atts = Atts
} = Doc4 = stem_revisions(Db, Doc3),
NewRevInfo = #{
@@ -1350,7 +1351,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
rev_id => {NewRevPos, NewRev},
rev_path => NewRevPath,
sequence => undefined,
- branch_count => undefined
+ branch_count => undefined,
+ att_hash => fabric2_util:hash_atts(Atts)
},
% Gather the list of possible winnig revisions
@@ -1405,7 +1407,8 @@ update_doc_replicated(Db, Doc0, _Options) ->
rev_id => {RevPos, Rev},
rev_path => RevPath,
sequence => undefined,
- branch_count => undefined
+ branch_count => undefined,
+ att_hash => <<>>
},
AllRevInfos = fabric2_fdb:get_all_revs(Db, DocId),
@@ -1444,6 +1447,9 @@ update_doc_replicated(Db, Doc0, _Options) ->
PrevRevInfo = find_prev_revinfo(RevPos, LeafPath),
Doc2 = prep_and_validate(Db, Doc1, PrevRevInfo),
Doc3 = flush_doc_atts(Db, Doc2),
+ DocRevInfo2 = DocRevInfo1#{
+ atts_hash => fabric2_util:hash_atts(Doc3#doc.atts)
+ },
% Possible winners are the previous winner and
% the new DocRevInfo
@@ -1453,9 +1459,9 @@ update_doc_replicated(Db, Doc0, _Options) ->
end,
{NewWinner0, NonWinner} = case Winner == PrevRevInfo of
true ->
- {DocRevInfo1, not_found};
+ {DocRevInfo2, not_found};
false ->
- [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo1]),
+ [W, NW] = fabric2_util:sort_revinfos([Winner, DocRevInfo2]),
{W, NW}
end,
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index fb2891be7..404460ed5 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -580,9 +580,40 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
#doc{
id = DocId,
- deleted = Deleted
+ deleted = Deleted,
+ atts = Atts
} = Doc,
+ % Doc body
+
+ ok = write_doc_body(Db, Doc),
+
+ % Attachment bookkeeping
+
+ % If a document's attachments have changed we have to scan
+ % for any attachments that may need to be deleted. The check
+ % for `>= 2` is a bit subtle. The important point is that
+ % one of the revisions will be from the new document so we
+ % have to find at least one more beyond that to assert that
+ % the attachments have not changed.
+ AttHash = fabric2_util:hash_atts(Atts),
+ RevsToCheck = [NewWinner0] ++ ToUpdate ++ ToRemove,
+ AttHashCount = lists:foldl(fun(Att, Count) ->
+ #{att_hash := RevAttHash} = Att,
+ case RevAttHash == AttHash of
+ true -> Count + 1;
+ false -> Count
+ end
+ end, 0, RevsToCheck),
+ if
+ AttHashCount == length(RevsToCheck) ->
+ ok;
+ AttHashCount >= 2 ->
+ ok;
+ true ->
+ cleanup_attachments(Db, DocId, Doc, ToRemove)
+ end,
+
% Revision tree
NewWinner = NewWinner0#{winner := true},
@@ -649,8 +680,6 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
% And all the rest...
- ok = write_doc_body(Db, Doc),
-
IsDDoc = case Doc#doc.id of
<<?DESIGN_DOC_PREFIX, _/binary>> -> true;
_ -> false
@@ -755,6 +784,9 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
AttId = fabric2_util:uuid(),
Chunks = chunkify_binary(Data),
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ ok = erlfdb:set(Tx, IdKey, <<>>),
+
lists:foldl(fun(Chunk, ChunkId) ->
AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
ok = erlfdb:set(Tx, AttKey, Chunk),
@@ -1014,16 +1046,71 @@ clear_doc_body(#{} = Db, DocId, #{} = RevInfo) ->
ok = erlfdb:clear_range(Tx, StartKey, EndKey).
+cleanup_attachments(Db, DocId, NewDoc, ToRemove) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = Db,
+
+ RemoveRevs = lists:map(fun(#{rev_id := RevId}) -> RevId end, ToRemove),
+
+ % Gather all known document revisions
+ {ok, DiskDocs} = fabric2_db:open_doc_revs(Db, DocId, all, []),
+ AllDocs = [{ok, NewDoc} | DiskDocs],
+
+ % Get referenced attachment ids
+ ActiveIdSet = lists:foldl(fun({ok, Doc}, Acc) ->
+ #doc{
+ revs = {Pos, [Rev | _]}
+ } = Doc,
+ case lists:member({Pos, Rev}, RemoveRevs) of
+ true ->
+ Acc;
+ false ->
+ lists:foldl(fun(Att, InnerAcc) ->
+ {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att),
+ sets:add_element(AttId, InnerAcc)
+ end, Acc, Doc#doc.atts)
+ end
+ end, sets:new(), AllDocs),
+
+ AttPrefix = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId}, DbPrefix),
+ Options = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range_startswith(Tx, AttPrefix, Options),
+
+ ExistingIdSet = lists:foldl(fun({K, _}, Acc) ->
+ {?DB_ATT_NAMES, DocId, AttId} = erlfdb_tuple:unpack(K, DbPrefix),
+ sets:add_element(AttId, Acc)
+ end, sets:new(), erlfdb:wait(Future)),
+
+ AttsToRemove = sets:subtract(ExistingIdSet, ActiveIdSet),
+
+ lists:foreach(fun(AttId) ->
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ erlfdb:clear(Tx, IdKey),
+
+ ChunkKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+ erlfdb:clear_range_startswith(Tx, ChunkKey)
+ end, sets:to_list(AttsToRemove)).
+
+
revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
#{
deleted := Deleted,
rev_id := {RevPos, Rev},
rev_path := RevPath,
- branch_count := BranchCount
+ branch_count := BranchCount,
+ att_hash := AttHash
} = RevId,
VS = new_versionstamp(Tx),
Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
- Val = {?CURR_REV_FORMAT, VS, BranchCount, list_to_tuple(RevPath)},
+ Val = {
+ ?CURR_REV_FORMAT,
+ VS,
+ BranchCount,
+ list_to_tuple(RevPath),
+ AttHash
+ },
KBin = erlfdb_tuple:pack(Key, DbPrefix),
VBin = erlfdb_tuple:pack_vs(Val),
{KBin, VBin, VS};
@@ -1032,38 +1119,49 @@ revinfo_to_fdb(_Tx, DbPrefix, DocId, #{} = RevId) ->
#{
deleted := Deleted,
rev_id := {RevPos, Rev},
- rev_path := RevPath
+ rev_path := RevPath,
+ att_hash := AttHash
} = RevId,
Key = {?DB_REVS, DocId, not Deleted, RevPos, Rev},
- Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath)},
+ Val = {?CURR_REV_FORMAT, list_to_tuple(RevPath), AttHash},
KBin = erlfdb_tuple:pack(Key, DbPrefix),
VBin = erlfdb_tuple:pack(Val),
{KBin, VBin, undefined}.
-fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _} = Val) ->
+fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _, _, _} = Val) ->
{?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
- {_RevFormat, Sequence, BranchCount, RevPath} = Val,
+ {_RevFormat, Sequence, BranchCount, RevPath, AttHash} = Val,
#{
winner => true,
deleted => not NotDeleted,
rev_id => {RevPos, Rev},
rev_path => tuple_to_list(RevPath),
sequence => Sequence,
- branch_count => BranchCount
+ branch_count => BranchCount,
+ att_hash => AttHash
};
-fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _} = Val) ->
+fdb_to_revinfo(Key, {?CURR_REV_FORMAT, _, _} = Val) ->
{?DB_REVS, _DocId, NotDeleted, RevPos, Rev} = Key,
- {_RevFormat, RevPath} = Val,
+ {_RevFormat, RevPath, AttHash} = Val,
#{
winner => false,
deleted => not NotDeleted,
rev_id => {RevPos, Rev},
rev_path => tuple_to_list(RevPath),
sequence => undefined,
- branch_count => undefined
- }.
+ branch_count => undefined,
+ att_hash => AttHash
+ };
+
+fdb_to_revinfo(Key, {0, Seq, BCount, RPath}) ->
+ Val = {?CURR_REV_FORMAT, Seq, BCount, RPath, <<>>},
+ fdb_to_revinfo(Key, Val);
+
+fdb_to_revinfo(Key, {0, RPath}) ->
+ Val = {?CURR_REV_FORMAT, RPath, <<>>},
+ fdb_to_revinfo(Key, Val).
doc_to_fdb(Db, #doc{} = Doc) ->
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 2b8e49ebf..e3f94f546 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -25,6 +25,8 @@
validate_security_object/1,
+ hash_atts/1,
+
dbname_ends_with/2,
get_value/2,
@@ -124,6 +126,20 @@ validate_json_list_of_strings(Member, Props) ->
end.
+hash_atts([]) ->
+ <<>>;
+
+hash_atts(Atts) ->
+ lists:sort(fun(A, B) ->
+ couch_att:fetch(name, A) =< couch_att:fetch(name, B)
+ end, Atts),
+ Md5St = lists:foldl(fun(Att, Acc) ->
+ {loc, _Db, _DocId, AttId} = couch_att:fetch(data, Att),
+ couch_hash:md5_hash_update(Acc, AttId)
+ end, couch_hash:md5_hash_init(), Atts),
+ couch_hash:md5_hash_final(Md5St).
+
+
dbname_ends_with(#{} = Db, Suffix) ->
dbname_ends_with(fabric2_db:name(Db), Suffix);
diff --git a/src/fabric/test/fabric2_doc_att_tests.erl b/src/fabric/test/fabric2_doc_att_tests.erl
new file mode 100644
index 000000000..331e1a4e8
--- /dev/null
+++ b/src/fabric/test/fabric2_doc_att_tests.erl
@@ -0,0 +1,285 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_doc_att_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include("fabric2.hrl").
+-include("fabric2_test.hrl").
+
+
+doc_crud_test_() ->
+ {
+ "Test document CRUD operations",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ with([
+ ?TDEF(create_att),
+ ?TDEF(delete_att),
+ ?TDEF(multiple_atts),
+ ?TDEF(delete_one_att),
+ ?TDEF(large_att),
+ ?TDEF(att_on_conflict_isolation)
+ ])
+ }
+ }.
+
+
+setup() ->
+ Ctx = test_util:start_couch([fabric]),
+ {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+ {Db, Ctx}.
+
+
+cleanup({Db, Ctx}) ->
+ ok = fabric2_db:delete(fabric2_db:name(Db), []),
+ test_util:stop_couch(Ctx).
+
+
+create_att({Db, _}) ->
+ DocId = fabric2_util:uuid(),
+ Att1 = couch_att:new([
+ {name, <<"foo.txt">>},
+ {type, <<"application/octet-stream">>},
+ {att_len, 6},
+ {data, <<"foobar">>},
+ {encoding, identity},
+ {md5, <<>>}
+ ]),
+ Doc1 = #doc{
+ id = DocId,
+ atts = [Att1]
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc1),
+ {ok, Doc2} = fabric2_db:open_doc(Db, DocId),
+ #doc{
+ atts = [Att2]
+ } = Doc2,
+ {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2),
+ AttData = fabric2_db:read_attachment(Db, DocId, AttId),
+ ?assertEqual(<<"foobar">>, AttData),
+
+ % Check that the raw keys exist
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+
+ fabric2_fdb:transactional(fun(Tx) ->
+ IdVal = erlfdb:wait(erlfdb:get(Tx, IdKey)),
+ AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)),
+
+ ?assertEqual(<<>>, IdVal),
+ ?assertMatch([{_, <<"foobar">>}], AttVals)
+ end).
+
+
+delete_att({Db, _}) ->
+ DocId = fabric2_util:uuid(),
+ Att1 = couch_att:new([
+ {name, <<"foo.txt">>},
+ {type, <<"application/octet-stream">>},
+ {att_len, 6},
+ {data, <<"foobar">>},
+ {encoding, identity},
+ {md5, <<>>}
+ ]),
+ Doc1 = #doc{
+ id = DocId,
+ atts = [Att1]
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc1),
+ {ok, Doc2} = fabric2_db:open_doc(Db, DocId),
+ #doc{
+ atts = [Att2]
+ } = Doc2,
+ {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2),
+
+ Doc3 = Doc2#doc{atts = []},
+ {ok, _} = fabric2_db:update_doc(Db, Doc3),
+
+ {ok, Doc4} = fabric2_db:open_doc(Db, DocId),
+ ?assertEqual([], Doc4#doc.atts),
+
+ % Check that the raw keys were removed
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ IdKey = erlfdb_tuple:pack({?DB_ATT_NAMES, DocId, AttId}, DbPrefix),
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+
+ fabric2_fdb:transactional(fun(Tx) ->
+ IdVal = erlfdb:wait(erlfdb:get(Tx, IdKey)),
+ AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)),
+
+ ?assertEqual(not_found, IdVal),
+ ?assertMatch([], AttVals)
+ end).
+
+
+multiple_atts({Db, _}) ->
+ DocId = fabric2_util:uuid(),
+ Atts = [
+ mk_att(<<"foo.txt">>, <<"foobar">>),
+ mk_att(<<"bar.txt">>, <<"barfoo">>),
+ mk_att(<<"baz.png">>, <<"blargh">>)
+ ],
+ {ok, _} = create_doc(Db, DocId, Atts),
+ ?assertEqual(
+ #{
+ <<"foo.txt">> => <<"foobar">>,
+ <<"bar.txt">> => <<"barfoo">>,
+ <<"baz.png">> => <<"blargh">>
+ },
+ read_atts(Db, DocId)
+ ).
+
+
+delete_one_att({Db, _}) ->
+ DocId = fabric2_util:uuid(),
+ Atts1 = [
+ mk_att(<<"foo.txt">>, <<"foobar">>),
+ mk_att(<<"bar.txt">>, <<"barfoo">>),
+ mk_att(<<"baz.png">>, <<"blargh">>)
+ ],
+ {ok, RevId} = create_doc(Db, DocId, Atts1),
+ Atts2 = tl(Atts1),
+ {ok, _} = update_doc(Db, DocId, RevId, stubify(RevId, Atts2)),
+ ?assertEqual(
+ #{
+ <<"bar.txt">> => <<"barfoo">>,
+ <<"baz.png">> => <<"blargh">>
+ },
+ read_atts(Db, DocId)
+ ).
+
+
+large_att({Db, _}) ->
+ DocId = fabric2_util:uuid(),
+ % Total size ~360,000 bytes
+ AttData = iolist_to_binary([
+ <<"foobar">> || _ <- lists:seq(1, 60000)
+ ]),
+ Att1 = mk_att("long.txt", AttData),
+ {ok, _} = create_doc(Db, DocId, [Att1]),
+ ?assertEqual(#{"long.txt" => AttData}, read_atts(Db, DocId)),
+
+ {ok, Doc} = fabric2_db:open_doc(Db, DocId),
+ #doc{atts = [Att2]} = Doc,
+ {loc, _Db, DocId, AttId} = couch_att:fetch(data, Att2),
+
+ #{db_prefix := DbPrefix} = Db,
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+ fabric2_fdb:transactional(fun(Tx) ->
+ AttVals = erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)),
+ ?assertEqual(4, length(AttVals))
+ end).
+
+
+att_on_conflict_isolation({Db, _}) ->
+ DocId = fabric2_util:uuid(),
+ [PosRevA1, PosRevB1] = create_conflicts(Db, DocId, []),
+ Att = mk_att(<<"happy_goat.tiff">>, <<":D>">>),
+ {ok, PosRevA2} = update_doc(Db, DocId, PosRevA1, [Att]),
+ ?assertEqual(
+ #{<<"happy_goat.tiff">> => <<":D>">>},
+ read_atts(Db, DocId, PosRevA2)
+ ),
+ ?assertEqual(#{}, read_atts(Db, DocId, PosRevB1)).
+
+
+mk_att(Name, Data) ->
+ couch_att:new([
+ {name, Name},
+ {type, <<"application/octet-stream">>},
+ {att_len, size(Data)},
+ {data, Data},
+ {encoding, identity},
+ {md5, <<>>}
+ ]).
+
+
+stubify(RevId, Atts) when is_list(Atts) ->
+ lists:map(fun(Att) ->
+ stubify(RevId, Att)
+ end, Atts);
+
+stubify({Pos, _Rev}, Att) ->
+ couch_att:store([
+ {data, stub},
+ {revpos, Pos}
+ ], Att).
+
+
+create_doc(Db, DocId, Atts) ->
+ Doc = #doc{
+ id = DocId,
+ atts = Atts
+ },
+ fabric2_db:update_doc(Db, Doc).
+
+
+update_doc(Db, DocId, {Pos, Rev}, Atts) ->
+ Doc = #doc{
+ id = DocId,
+ revs = {Pos, [Rev]},
+ atts = Atts
+ },
+ fabric2_db:update_doc(Db, Doc).
+
+
+create_conflicts(Db, DocId, Atts) ->
+ Base = #doc{
+ id = DocId,
+ atts = Atts
+ },
+ {ok, {_, Rev1} = PosRev} = fabric2_db:update_doc(Db, Base),
+ <<Rev2:16/binary, Rev3:16/binary>> = fabric2_util:uuid(),
+ Doc1 = #doc{
+ id = DocId,
+ revs = {2, [Rev2, Rev1]},
+ atts = stubify(PosRev, Atts)
+ },
+ Doc2 = #doc{
+ id = DocId,
+ revs = {2, [Rev3, Rev1]},
+ atts = stubify(PosRev, Atts)
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc1, [replicated_changes]),
+ {ok, _} = fabric2_db:update_doc(Db, Doc2, [replicated_changes]),
+ lists:reverse(lists:sort([{2, Rev2}, {2, Rev3}])).
+
+
+read_atts(Db, DocId) ->
+ {ok, #doc{atts = Atts}} = fabric2_db:open_doc(Db, DocId),
+ atts_to_map(Db, DocId, Atts).
+
+
+read_atts(Db, DocId, PosRev) ->
+ {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, [PosRev], []),
+ [{ok, #doc{atts = Atts}}] = Docs,
+ atts_to_map(Db, DocId, Atts).
+
+
+atts_to_map(Db, DocId, Atts) ->
+ lists:foldl(fun(Att, Acc) ->
+ [Name, Data] = couch_att:fetch([name, data], Att),
+ {loc, _Db, DocId, AttId} = Data,
+ AttBin = fabric2_db:read_attachment(Db, DocId, AttId),
+ maps:put(Name, AttBin, Acc)
+ end, #{}, Atts).